diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 87316c3cfbae2a..3634e19c68c48e 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -164,8 +164,7 @@ Status DeltaWriter::init() { std::lock_guard lck(_tablet->get_header_lock()); _cur_max_version = _tablet->max_version_unlocked().second; // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + if (_tablet->tablet_state() == TABLET_NOTREADY) { // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' if (_req.table_schema_param->is_partial_update()) { return Status::InternalError( @@ -174,7 +173,7 @@ Status DeltaWriter::init() { } _rowset_ids.clear(); } else { - _rowset_ids = _tablet->all_rs_id(_cur_max_version); + RETURN_IF_ERROR(_tablet->all_rs_id(_cur_max_version, &_rowset_ids)); } } @@ -459,8 +458,7 @@ Status DeltaWriter::submit_calc_delete_bitmap_task() { std::lock_guard l(_lock); // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + if (_tablet->tablet_state() == TABLET_NOTREADY) { LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " "tablet_id: " << _tablet->tablet_id() << " txn_id: " << _req.txn_id; @@ -469,11 +467,6 @@ Status DeltaWriter::submit_calc_delete_bitmap_task() { auto beta_rowset = reinterpret_cast(_cur_rowset.get()); std::vector segments; RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { - return Status::OK(); - } if (segments.size() > 1) { // calculate delete bitmap between segments RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments, @@ -510,8 +503,7 @@ Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool write_single_replica) { if (_tablet->enable_unique_key_merge_on_write() && config::enable_merge_on_write_correctness_check && _cur_rowset->num_rows() != 0 && - !(_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id()))) { + _tablet->tablet_state() != TABLET_NOTREADY) { auto st = _tablet->check_delete_bitmap_correctness( _delete_bitmap, _cur_rowset->end_version() - 1, _req.txn_id, _rowset_ids); if (!st.ok()) { diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 9b832ec555240a..e95d6750ad3342 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -147,8 +147,7 @@ Status FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedP std::vector tmp_rowsets {}; // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + if (_tablet->tablet_state() == TABLET_NOTREADY) { LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" << _tablet->tablet_id(); return Status::OK(); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 740255b4325840..acd468a47e83c4 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -916,12 +916,9 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& _tablet_ids_in_converting.insert(new_tablet->tablet_id()); } res = _convert_historical_rowsets(sc_params); - if (new_tablet->keys_type() != UNIQUE_KEYS || - !new_tablet->enable_unique_key_merge_on_write() || !res) { - { - std::lock_guard wrlock(_mutex); - _tablet_ids_in_converting.erase(new_tablet->tablet_id()); - } + { + std::lock_guard wrlock(_mutex); + _tablet_ids_in_converting.erase(new_tablet->tablet_id()); } if (!res) { break; @@ -981,10 +978,6 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& } // step 4 - { - std::lock_guard wrlock(_mutex); - _tablet_ids_in_converting.erase(new_tablet->tablet_id()); - } res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING); if (!res) { break; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 52d5072f7d338c..a4ecf33a0bb257 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1082,6 +1082,13 @@ void StorageEngine::start_delete_unused_rowset() { VLOG_NOTICE << "start to remove rowset:" << it->second->rowset_id() << ", version:" << it->second->version().first << "-" << it->second->version().second; + auto tablet_id = it->second->rowset_meta()->tablet_id(); + auto tablet = _tablet_manager->get_tablet(tablet_id); + // delete delete_bitmap of unused rowsets + if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { + tablet->tablet_meta()->delete_bitmap().remove({it->second->rowset_id(), 0, 0}, + {it->second->rowset_id(), UINT32_MAX, 0}); + } Status status = it->second->remove(); VLOG_NOTICE << "remove rowset:" << it->second->rowset_id() << " finished. status:" << status; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 527a8b1c37223d..8a18f0d1d1b4ca 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3320,7 +3320,8 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) << tablet_id() << " cur max_version: " << cur_version; return Status::OK(); } - RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1); + RowsetIdUnorderedSet cur_rowset_ids; + RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids)); DeleteBitmapPtr delete_bitmap = std::make_shared(tablet_id()); RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); @@ -3370,7 +3371,7 @@ Status Tablet::commit_phase_update_delete_bitmap( { std::shared_lock meta_rlock(_meta_lock); cur_version = max_version_unlocked().second; - cur_rowset_ids = all_rs_id(cur_version); + RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids)); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add); @@ -3411,13 +3412,12 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, { std::shared_lock meta_rlock(_meta_lock); // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(tablet_id())) { + if (tablet_state() == TABLET_NOTREADY) { LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" << tablet_id(); return Status::OK(); } - cur_rowset_ids = all_rs_id(cur_version - 1); + RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids)); } auto t2 = watch.get_elapse_time_us(); @@ -3454,7 +3454,7 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id() << ", rowset_ids to add: " << rowset_ids_to_add.size() << ", rowset_ids to del: " << rowset_ids_to_del.size() - << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id << "," + << ", cur version: " << cur_version << ", transaction_id: " << txn_id << "," << ss.str() << " , total rows: " << total_rows; if (config::enable_merge_on_write_correctness_check && rowset->num_rows() != 0) { @@ -3586,18 +3586,24 @@ Status Tablet::check_rowid_conversion( return Status::OK(); } -RowsetIdUnorderedSet Tablet::all_rs_id(int64_t max_version) const { - RowsetIdUnorderedSet rowset_ids; - for (const auto& rs_it : _rs_version_map) { - if (rs_it.first.second == 1) { +Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { + // Ensure that the obtained versions of rowsets are continuous + std::vector version_path; + RETURN_IF_ERROR(capture_consistent_versions(Version(0, max_version), &version_path)); + for (auto& ver : version_path) { + if (ver.second == 1) { // [0-1] rowset is empty for each tablet, skip it continue; } - if (rs_it.first.second <= max_version) { - rowset_ids.insert(rs_it.second->rowset_id()); + auto it = _rs_version_map.find(ver); + if (it == _rs_version_map.end()) { + return Status::Error( + "fail to find Rowset for version. tablet={}, version={}", tablet_id(), + ver.to_string()); } + rowset_ids->emplace(it->second->rowset_id()); } - return rowset_ids; + return Status::OK(); } bool Tablet::check_all_rowset_segment() { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 9e665927d31f4b..7fbf2b9a98f3a7 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -508,7 +508,7 @@ class Tablet : public BaseTablet { RowsetSharedPtr dst_rowset, const std::map>>& location_map); - RowsetIdUnorderedSet all_rs_id(int64_t max_version) const; + Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const; void sort_block(vectorized::Block& in_block, vectorized::Block& output_block); bool check_all_rowset_segment(); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index b407e35a449046..69ea6c8d4e880b 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -765,11 +765,6 @@ void TabletMeta::modify_rs_metas(const std::vector& to_add, ++it; } } - // delete delete_bitmap of to_delete's rowsets if not added to _stale_rs_metas. - if (same_version && _enable_unique_key_merge_on_write) { - delete_bitmap().remove({rs_to_del->rowset_id(), 0, 0}, - {rs_to_del->rowset_id(), UINT32_MAX, 0}); - } } if (!same_version) { // put to_delete rowsets in _stale_rs_metas. diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 4eb43864becdf9..3621a958eff80c 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -111,6 +111,16 @@ Status EngineCloneTask::_do_clone() { // Check local tablet exist or not TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id); + + // The status of a tablet is not ready, indicating that it is a residual tablet after a schema + // change failure. It should not provide normal read and write, so drop it here. + if (tablet && tablet->tablet_state() == TABLET_NOTREADY) { + LOG(WARNING) << "tablet state is not ready when clone, need to drop old tablet, tablet_id=" + << tablet->tablet_id(); + RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet( + tablet->tablet_id(), tablet->replica_id(), false)); + tablet.reset(); + } bool is_new_tablet = tablet == nullptr; // try to incremental clone std::vector missed_versions; diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 702c4386f11c3c..d6642705bafbf6 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -153,26 +153,24 @@ Status EnginePublishVersionTask::finish() { StorageEngine::instance()->txn_manager()->update_tablet_version_txn( tablet_info.tablet_id, version.second, transaction_id); } - Version max_version; + int64_t max_version; TabletState tablet_state; { std::shared_lock rdlock(tablet->get_header_lock()); - max_version = tablet->max_version_unlocked(); + max_version = tablet->max_version_unlocked().second; tablet_state = tablet->tablet_state(); } - if (tablet_state == TabletState::TABLET_RUNNING && - version.first != max_version.second + 1) { - // If a tablet migrates out and back, the previously failed - // publish task may retry on the new tablet, so check - // whether the version exists. if not exist, then set - // publish failed - if (!tablet->check_version_exist(version)) { + if (version.first != max_version + 1) { + if (tablet->check_version_exist(version)) { + continue; + } + auto handle_version_not_continuous = [&]() { add_error_tablet_id(tablet_info.tablet_id); _discontinuous_version_tablets->emplace_back( partition_id, tablet_info.tablet_id, version.first); res = Status::Error( "check_version_exist failed"); - int64_t missed_version = max_version.second + 1; + int64_t missed_version = max_version + 1; int64_t missed_txn_id = StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version( tablet->tablet_id(), missed_version); @@ -187,8 +185,20 @@ Status EnginePublishVersionTask::finish() { } else { LOG_EVERY_SECOND(INFO) << msg; } + }; + // The versions during the schema change period need to be also continuous + if (tablet_state == TabletState::TABLET_NOTREADY) { + Version max_continuous_version = {-1, 0}; + tablet->max_continuous_version_from_beginning(&max_continuous_version); + if (max_version > 1 && version.first > max_version && + max_continuous_version.second != max_version) { + handle_version_not_continuous(); + continue; + } + } else { + handle_version_not_continuous(); + continue; } - continue; } }