From 21d8925126afdaf9be2a1e88d2edc68be4dd7844 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 7 Jan 2025 22:21:13 +0800 Subject: [PATCH 1/2] [Fix](partial update) abort partial update on shadow index's tablet when the including columns miss key columns on new schema (#46347) During a schema change which changes key columns, partial update on shadow index's tablet may cause duplicate key problem. So we abort the partial update in this situation. --- be/src/cloud/cloud_rowset_builder.cpp | 143 ++++++++++++++++++ be/src/olap/delta_writer_v2.cpp | 23 +-- be/src/olap/delta_writer_v2.h | 6 +- be/src/olap/partial_update_info.cpp | 27 +++- be/src/olap/partial_update_info.h | 9 +- .../segment_v2/vertical_segment_writer.cpp | 23 ++- be/src/olap/rowset_builder.cpp | 18 ++- be/src/olap/rowset_builder.h | 6 +- be/src/olap/schema_change.cpp | 1 + .../apache/doris/alter/SchemaChangeJobV2.java | 1 + .../test_add_key_partial_update.out | 17 +++ .../test_add_key_partial_update.groovy | 74 +++++++++ 12 files changed, 310 insertions(+), 38 deletions(-) create mode 100644 be/src/cloud/cloud_rowset_builder.cpp create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out create mode 100644 regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp new file mode 100644 index 00000000000000..9466dd1062803e --- /dev/null +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_rowset_builder.h" + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_mgr.h" +#include "olap/storage_policy.h" + +namespace doris { +using namespace ErrorCode; + +CloudRowsetBuilder::CloudRowsetBuilder(CloudStorageEngine& engine, const WriteRequest& req, + RuntimeProfile* profile) + : BaseRowsetBuilder(req, profile), _engine(engine) {} + +CloudRowsetBuilder::~CloudRowsetBuilder() = default; + +Status CloudRowsetBuilder::init() { + _tablet = DORIS_TRY(_engine.get_tablet(_req.tablet_id)); + + std::shared_ptr mow_context; + if (_tablet->enable_unique_key_merge_on_write()) { + auto st = std::static_pointer_cast(_tablet)->sync_rowsets(); + // sync_rowsets will return INVALID_TABLET_STATE when tablet is under alter + if (!st.ok() && !st.is()) { + return st; + } + RETURN_IF_ERROR(init_mow_context(mow_context)); + } + RETURN_IF_ERROR(check_tablet_version_count()); + + using namespace std::chrono; + std::static_pointer_cast(_tablet)->last_load_time_ms = + duration_cast(system_clock::now().time_since_epoch()).count(); + + // build tablet schema in request level + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_tablet->tablet_schema())); + + RowsetWriterContext context; + context.txn_id = _req.txn_id; + context.txn_expiration = _req.txn_expiration; + context.load_id = _req.load_id; + context.rowset_state = PREPARED; + context.segments_overlap = OVERLAPPING; + context.tablet_schema = _tablet_schema; + context.newest_write_timestamp = UnixSeconds(); + context.tablet_id = _req.tablet_id; + context.index_id = _req.index_id; + context.tablet = _tablet; + context.write_type = DataWriteType::TYPE_DIRECT; + context.mow_context = mow_context; + context.write_file_cache = _req.write_file_cache; + context.partial_update_info = _partial_update_info; + context.file_cache_ttl_sec = _tablet->ttl_seconds(); + context.storage_resource = _engine.get_storage_resource(_req.storage_vault_id); + if (!context.storage_resource) { + return Status::InternalError("vault id not found, maybe not sync, vault id {}", + _req.storage_vault_id); + } + + _rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); + + _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); + + RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta())); + + _is_init = true; + return Status::OK(); +} + +Status CloudRowsetBuilder::check_tablet_version_count() { + int version_count = cloud_tablet()->fetch_add_approximate_num_rowsets(0); + // TODO(plat1ko): load backoff algorithm + if (version_count > config::max_tablet_version_num) { + return Status::Error( + "failed to init rowset builder. version count: {}, exceed limit: {}, " + "tablet: {}. Please reduce the frequency of loading data or adjust the " + "max_tablet_version_num in be.conf to a larger value.", + version_count, config::max_tablet_version_num, _tablet->tablet_id()); + } + return Status::OK(); +} + +void CloudRowsetBuilder::update_tablet_stats() { + auto* tablet = cloud_tablet(); + DCHECK(tablet); + DCHECK(_rowset); + tablet->fetch_add_approximate_num_rowsets(1); + tablet->fetch_add_approximate_num_segments(_rowset->num_segments()); + tablet->fetch_add_approximate_num_rows(_rowset->num_rows()); + tablet->fetch_add_approximate_data_size(_rowset->total_disk_size()); + tablet->fetch_add_approximate_cumu_num_rowsets(1); + tablet->fetch_add_approximate_cumu_num_deltas(_rowset->num_segments()); + tablet->write_count.fetch_add(1, std::memory_order_relaxed); +} + +CloudTablet* CloudRowsetBuilder::cloud_tablet() { + return static_cast(_tablet.get()); +} + +const RowsetMetaSharedPtr& CloudRowsetBuilder::rowset_meta() { + return _rowset_writer->rowset_meta(); +} + +Status CloudRowsetBuilder::set_txn_related_delete_bitmap() { + if (_tablet->enable_unique_key_merge_on_write()) { + if (config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0) { + auto st = _tablet->check_delete_bitmap_correctness( + _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, _rowset_ids); + if (!st.ok()) { + LOG(WARNING) << fmt::format( + "[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] " + "delete bitmap correctness check failed in commit phase!", + _req.tablet_id, _req.txn_id, UniqueId(_req.load_id).to_string(), + _req.partition_id); + return st; + } + } + _engine.txn_delete_bitmap_cache().set_tablet_txn_info( + _req.txn_id, _tablet->tablet_id(), _delete_bitmap, _rowset_ids, _rowset, + _req.txn_expiration, _partial_update_info); + } + return Status::OK(); +} +} // namespace doris diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index aaa35a0c2a949a..cb84134658d262 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -102,8 +102,8 @@ Status DeltaWriterV2::init() { if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) { return Status::InternalError("failed to find tablet schema for {}", _req.index_id); } - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_streams[0]->tablet_schema(_req.index_id)); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_streams[0]->tablet_schema(_req.index_id))); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -209,9 +209,9 @@ Status DeltaWriterV2::cancel_with_status(const Status& st) { return Status::OK(); } -void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { _tablet_schema->copy_from(ori_tablet_schema); // find the right index id int i = 0; @@ -235,12 +235,13 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns(), - table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), - table_schema_param->nano_seconds(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn()); + RETURN_IF_ERROR(_partial_update_info->init( + _req.tablet_id, _req.txn_id, *_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), + table_schema_param->nano_seconds(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn())); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 0ef564be393762..418ad098e57ae4 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -85,9 +85,9 @@ class DeltaWriterV2 { Status cancel_with_status(const Status& st); private: - void _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); + Status _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); void _update_profile(RuntimeProfile* profile); diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index c75e6c554ea11c..aa27f6b680c668 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -27,11 +27,11 @@ namespace doris { -void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update, - const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, int32_t nano_seconds, - const std::string& timezone, const std::string& auto_increment_column, - int64_t cur_max_version) { +Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, + bool partial_update, const std::set& partial_update_cols, + bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, + const std::string& timezone, + const std::string& auto_increment_column, int64_t cur_max_version) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; @@ -40,6 +40,22 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_upd this->timezone = timezone; missing_cids.clear(); update_cids.clear(); + + if (is_partial_update) { + // partial_update_cols should include all key columns + for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) { + const auto key_col = tablet_schema.column(i); + if (!partial_update_cols.contains(key_col.name())) { + auto msg = fmt::format( + "Unable to do partial update on shadow index's tablet, tablet_id={}, " + "txn_id={}. Missing key column {}.", + tablet_id, txn_id, key_col.name()); + LOG_WARNING(msg); + return Status::Aborted(msg); + } + } + } + for (auto i = 0; i < tablet_schema.num_columns(); ++i) { auto tablet_column = tablet_schema.column(i); if (!partial_update_input_columns.contains(tablet_column.name())) { @@ -59,6 +75,7 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_upd is_input_columns_contains_auto_inc_column = is_partial_update && partial_update_input_columns.contains(auto_increment_column); _generate_default_values_for_missing_cids(tablet_schema); + return Status::OK(); } void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 2d94361f503d79..e63e92cd19b8ce 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -26,10 +26,11 @@ class TabletSchema; class PartialUpdateInfoPB; struct PartialUpdateInfo { - void init(const TabletSchema& tablet_schema, bool partial_update, - const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, - const std::string& auto_increment_column, int64_t cur_max_version = -1); + Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, + bool partial_update, const std::set& partial_update_cols, + bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, + const std::string& timezone, const std::string& auto_increment_column, + int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); std::string summary() const; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 1be1f1ec180bda..8e2f70aea57de4 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -54,6 +54,7 @@ #include "service/point_query_executor.h" #include "util/coding.h" #include "util/crc32c.h" +#include "util/debug_points.h" #include "util/faststring.h" #include "util/key_util.h" #include "vec/columns/column_nullable.h" @@ -346,10 +347,6 @@ Status VerticalSegmentWriter::_partial_update_preconditions_check(size_t row_pos // 3. set columns to data convertor and then write all columns Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block) { - if constexpr (!std::is_same_v) { - // TODO(plat1ko): CloudStorageEngine - return Status::NotSupported("append_block_with_partial_content"); - } RETURN_IF_ERROR(_partial_update_preconditions_check(data.row_pos)); @@ -981,8 +978,26 @@ Status VerticalSegmentWriter::write_batch() { std::string VerticalSegmentWriter::_full_encode_keys( const std::vector& key_columns, size_t pos) { +<<<<<<< HEAD assert(_key_index_size.size() == _num_key_columns); assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns); +======= + assert(_key_index_size.size() == _num_sort_key_columns); + if (!(key_columns.size() == _num_sort_key_columns && + _key_coders.size() == _num_sort_key_columns)) { + LOG_INFO("key_columns.size()={}, _key_coders.size()={}, _num_sort_key_columns={}, ", + key_columns.size(), _key_coders.size(), _num_sort_key_columns); + } + assert(key_columns.size() == _num_sort_key_columns && + _key_coders.size() == _num_sort_key_columns); + return _full_encode_keys(_key_coders, key_columns, pos); +} + +std::string VerticalSegmentWriter::_full_encode_keys( + const std::vector& key_coders, + const std::vector& key_columns, size_t pos) { + assert(key_columns.size() == key_coders.size()); +>>>>>>> 0a54978dc7 ([Fix](partial update) abort partial update on shadow index's tablet when the including columns miss key columns on new schema (#46347)) std::string encoded_keys; size_t cid = 0; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index c668df4bd33141..e9da68bf9cc2e5 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -214,8 +214,8 @@ Status RowsetBuilder::init() { }; }) // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_tablet->tablet_schema()); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_tablet->tablet_schema())); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -377,9 +377,9 @@ Status BaseRowsetBuilder::cancel() { return Status::OK(); } -void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +Status BaseRowsetBuilder::_build_current_tablet_schema( + int64_t index_id, const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { // find the right index id int i = 0; auto indexes = table_schema_param->indexes(); @@ -419,12 +419,14 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init( - *_tablet_schema, table_schema_param->is_partial_update(), + RETURN_IF_ERROR(_partial_update_info->init( + tablet()->tablet_id(), _req.txn_id, *_tablet_schema, + table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase); + table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase)); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index 362e976da71976..3e29ee7f5e9f65 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -85,9 +85,9 @@ class BaseRowsetBuilder { } protected: - void _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); + Status _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); void _init_profile(RuntimeProfile* profile); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index b5cc293bdff387..49c3cc8e188081 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -723,6 +723,7 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req Status res = _do_process_alter_tablet_v2(request); LOG(INFO) << "finished alter tablet process, res=" << res; + DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.sleep", { sleep(5); }); return res; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index e98d8066a1135c..3e945e06e8b8e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -537,6 +537,7 @@ protected void runRunningJob() throws AlterCancelException { Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId); Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId); Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); + /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. diff --git a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out new file mode 100644 index 00000000000000..87d44dac59e4f4 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 + +-- !sql -- +1 \N 1 1 2 0 +2 \N 2 2 2 0 +3 \N 3 3 2 0 +4 \N 4 4 5 0 +5 \N 5 5 5 0 +6 \N 6 6 5 0 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy new file mode 100644 index 00000000000000..61ba9d60ea8371 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_add_key_partial_update", "nonConcurrent") { + + def table1 = "test_add_key_partial_update" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_mow_light_delete" = "false", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1),(2,2,2),(3,3,3);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the schema change process before it change the shadow index to base index + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::process_alter_tablet.leave.sleep") + + sql "alter table ${table1} ADD COLUMN k2 int key;" + + Thread.sleep(1000) + test { + sql "delete from ${table1} where k1<=3;" + exception "Unable to do partial update on shadow index's tablet" + } + + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 1000 + } + + sql "set skip_delete_sign=true;" + sql "sync;" + qt_sql "select k1,k2,c1,c2,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__ from ${table1} order by k1,k2,__DORIS_VERSION_COL__;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} From f2b43029fe101c93aeb7a310fe5ca8589a72763c Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 8 Jan 2025 11:23:31 +0800 Subject: [PATCH 2/2] fix --- be/src/cloud/cloud_rowset_builder.cpp | 143 ------------------ .../segment_v2/vertical_segment_writer.cpp | 18 --- 2 files changed, 161 deletions(-) delete mode 100644 be/src/cloud/cloud_rowset_builder.cpp diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp deleted file mode 100644 index 9466dd1062803e..00000000000000 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ /dev/null @@ -1,143 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "cloud/cloud_rowset_builder.h" - -#include "cloud/cloud_meta_mgr.h" -#include "cloud/cloud_storage_engine.h" -#include "cloud/cloud_tablet.h" -#include "cloud/cloud_tablet_mgr.h" -#include "olap/storage_policy.h" - -namespace doris { -using namespace ErrorCode; - -CloudRowsetBuilder::CloudRowsetBuilder(CloudStorageEngine& engine, const WriteRequest& req, - RuntimeProfile* profile) - : BaseRowsetBuilder(req, profile), _engine(engine) {} - -CloudRowsetBuilder::~CloudRowsetBuilder() = default; - -Status CloudRowsetBuilder::init() { - _tablet = DORIS_TRY(_engine.get_tablet(_req.tablet_id)); - - std::shared_ptr mow_context; - if (_tablet->enable_unique_key_merge_on_write()) { - auto st = std::static_pointer_cast(_tablet)->sync_rowsets(); - // sync_rowsets will return INVALID_TABLET_STATE when tablet is under alter - if (!st.ok() && !st.is()) { - return st; - } - RETURN_IF_ERROR(init_mow_context(mow_context)); - } - RETURN_IF_ERROR(check_tablet_version_count()); - - using namespace std::chrono; - std::static_pointer_cast(_tablet)->last_load_time_ms = - duration_cast(system_clock::now().time_since_epoch()).count(); - - // build tablet schema in request level - RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_tablet->tablet_schema())); - - RowsetWriterContext context; - context.txn_id = _req.txn_id; - context.txn_expiration = _req.txn_expiration; - context.load_id = _req.load_id; - context.rowset_state = PREPARED; - context.segments_overlap = OVERLAPPING; - context.tablet_schema = _tablet_schema; - context.newest_write_timestamp = UnixSeconds(); - context.tablet_id = _req.tablet_id; - context.index_id = _req.index_id; - context.tablet = _tablet; - context.write_type = DataWriteType::TYPE_DIRECT; - context.mow_context = mow_context; - context.write_file_cache = _req.write_file_cache; - context.partial_update_info = _partial_update_info; - context.file_cache_ttl_sec = _tablet->ttl_seconds(); - context.storage_resource = _engine.get_storage_resource(_req.storage_vault_id); - if (!context.storage_resource) { - return Status::InternalError("vault id not found, maybe not sync, vault id {}", - _req.storage_vault_id); - } - - _rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); - - _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); - - RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta())); - - _is_init = true; - return Status::OK(); -} - -Status CloudRowsetBuilder::check_tablet_version_count() { - int version_count = cloud_tablet()->fetch_add_approximate_num_rowsets(0); - // TODO(plat1ko): load backoff algorithm - if (version_count > config::max_tablet_version_num) { - return Status::Error( - "failed to init rowset builder. version count: {}, exceed limit: {}, " - "tablet: {}. Please reduce the frequency of loading data or adjust the " - "max_tablet_version_num in be.conf to a larger value.", - version_count, config::max_tablet_version_num, _tablet->tablet_id()); - } - return Status::OK(); -} - -void CloudRowsetBuilder::update_tablet_stats() { - auto* tablet = cloud_tablet(); - DCHECK(tablet); - DCHECK(_rowset); - tablet->fetch_add_approximate_num_rowsets(1); - tablet->fetch_add_approximate_num_segments(_rowset->num_segments()); - tablet->fetch_add_approximate_num_rows(_rowset->num_rows()); - tablet->fetch_add_approximate_data_size(_rowset->total_disk_size()); - tablet->fetch_add_approximate_cumu_num_rowsets(1); - tablet->fetch_add_approximate_cumu_num_deltas(_rowset->num_segments()); - tablet->write_count.fetch_add(1, std::memory_order_relaxed); -} - -CloudTablet* CloudRowsetBuilder::cloud_tablet() { - return static_cast(_tablet.get()); -} - -const RowsetMetaSharedPtr& CloudRowsetBuilder::rowset_meta() { - return _rowset_writer->rowset_meta(); -} - -Status CloudRowsetBuilder::set_txn_related_delete_bitmap() { - if (_tablet->enable_unique_key_merge_on_write()) { - if (config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0) { - auto st = _tablet->check_delete_bitmap_correctness( - _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, _rowset_ids); - if (!st.ok()) { - LOG(WARNING) << fmt::format( - "[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] " - "delete bitmap correctness check failed in commit phase!", - _req.tablet_id, _req.txn_id, UniqueId(_req.load_id).to_string(), - _req.partition_id); - return st; - } - } - _engine.txn_delete_bitmap_cache().set_tablet_txn_info( - _req.txn_id, _tablet->tablet_id(), _delete_bitmap, _rowset_ids, _rowset, - _req.txn_expiration, _partial_update_info); - } - return Status::OK(); -} -} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 8e2f70aea57de4..348dec4835d779 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -978,26 +978,8 @@ Status VerticalSegmentWriter::write_batch() { std::string VerticalSegmentWriter::_full_encode_keys( const std::vector& key_columns, size_t pos) { -<<<<<<< HEAD assert(_key_index_size.size() == _num_key_columns); assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns); -======= - assert(_key_index_size.size() == _num_sort_key_columns); - if (!(key_columns.size() == _num_sort_key_columns && - _key_coders.size() == _num_sort_key_columns)) { - LOG_INFO("key_columns.size()={}, _key_coders.size()={}, _num_sort_key_columns={}, ", - key_columns.size(), _key_coders.size(), _num_sort_key_columns); - } - assert(key_columns.size() == _num_sort_key_columns && - _key_coders.size() == _num_sort_key_columns); - return _full_encode_keys(_key_coders, key_columns, pos); -} - -std::string VerticalSegmentWriter::_full_encode_keys( - const std::vector& key_coders, - const std::vector& key_columns, size_t pos) { - assert(key_columns.size() == key_coders.size()); ->>>>>>> 0a54978dc7 ([Fix](partial update) abort partial update on shadow index's tablet when the including columns miss key columns on new schema (#46347)) std::string encoded_keys; size_t cid = 0;