From 9729b0ed62e0b3b267297746b1c2a97bdb0f5caf Mon Sep 17 00:00:00 2001 From: platoneko Date: Fri, 17 Feb 2023 20:26:12 +0800 Subject: [PATCH] Fix inconsistent replica id between BE and FE in corner case of tablet rebalance --- be/src/olap/tablet_manager.cpp | 14 ++++------- be/src/olap/tablet_manager.h | 3 ++- be/src/olap/task/engine_clone_task.cpp | 24 +++++++++---------- .../apache/doris/master/ReportHandler.java | 9 +++++-- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 45a98bf5bc22c8..129f1046cc40a2 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -444,8 +444,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, auto& shard = _get_tablets_shard(tablet_id); std::lock_guard wrlock(shard.lock); if (shard.tablets_under_clone.count(tablet_id) > 0) { - LOG(INFO) << "tablet " << tablet_id << " is under clone, skip drop task"; - return Status::Aborted("aborted"); + return Status::Aborted("tablet {} is under clone, skip drop task", tablet_id); } SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition); @@ -467,12 +466,9 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl // We should compare replica id to avoid dropping new cloned tablet. // Iff request replica id is 0, FE may be an older release, then we drop this tablet as before. if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) { - LOG(WARNING) << "fail to drop tablet because replica id not match. " - << "tablet_id=" << tablet_id << ", replica_id=" << to_drop_tablet->replica_id() - << ", request replica_id=" << replica_id; - return Status::Aborted("aborted"); + return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(), + replica_id); } - _remove_tablet_from_partition(to_drop_tablet); tablet_map_t& tablet_map = _get_tablet_map(tablet_id); tablet_map.erase(tablet_id); @@ -1025,10 +1021,10 @@ Status TabletManager::start_trash_sweep() { return Status::OK(); } // start_trash_sweep -void TabletManager::register_clone_tablet(int64_t tablet_id) { +bool TabletManager::register_clone_tablet(int64_t tablet_id) { tablets_shard& shard = _get_tablets_shard(tablet_id); std::lock_guard wrlock(shard.lock); - shard.tablets_under_clone.insert(tablet_id); + return shard.tablets_under_clone.insert(tablet_id).second; } void TabletManager::unregister_clone_tablet(int64_t tablet_id) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index b1f3300596dac8..0cda4b6edf92c0 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -147,7 +147,8 @@ class TabletManager { void obtain_specific_quantity_tablets(std::vector& tablets_info, int64_t num); - void register_clone_tablet(int64_t tablet_id); + // return `true` if register success + bool register_clone_tablet(int64_t tablet_id); void unregister_clone_tablet(int64_t tablet_id); void get_tablets_distribution_on_different_disks( diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 179c8b8e5beaca..464a1433efd112 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -68,7 +68,9 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo& Status EngineCloneTask::execute() { // register the tablet to avoid it is deleted by gc thread during clone process SCOPED_ATTACH_TASK(_mem_tracker); - StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id); + if (StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) { + return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id); + } Status st = _do_clone(); StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id); return st; @@ -86,6 +88,13 @@ Status EngineCloneTask::_do_clone() { std::vector missed_versions; // try to repair a tablet with missing version if (tablet != nullptr) { + if (tablet->replica_id() != _clone_req.replica_id) { + // `tablet` may be a dropped replica in FE, e.g: BE1 migrates replica of tablet_1 to BE2, + // but before BE1 drop this replica, another new replica of tablet_1 is migrated to BE1. + // If we allow to clone success on dropped replica, replica id may never be consistent between FE and BE. + return Status::InternalError("replica_id not match({} vs {})", tablet->replica_id(), + _clone_req.replica_id); + } std::shared_lock migration_rlock(tablet->get_migration_lock(), std::try_to_lock); if (!migration_rlock.owns_lock()) { return Status::Error(); @@ -101,15 +110,7 @@ Status EngineCloneTask::_do_clone() { // completed. Or remote be will just return header not the rowset files. clone will failed. if (missed_versions.empty()) { LOG(INFO) << "missed version size = 0, skip clone and return success. tablet_id=" - << _clone_req.tablet_id << " req replica=" << _clone_req.replica_id; - if (_clone_req.replica_id != tablet->replica_id()) { - // update replica id to meet cooldown replica - tablet->tablet_meta()->set_replica_id(_clone_req.replica_id); - { - std::shared_lock rlock(tablet->get_header_lock()); - tablet->save_meta(); - } - } + << _clone_req.tablet_id << " replica_id=" << _clone_req.replica_id; _set_tablet_info(is_new_tablet); return Status::OK(); } @@ -118,7 +119,7 @@ Status EngineCloneTask::_do_clone() { << ", allow_incremental_clone=" << allow_incremental_clone << ", signature=" << _signature << ", tablet_id=" << _clone_req.tablet_id << ", committed_version=" << _clone_req.committed_version - << ", req replica=" << _clone_req.replica_id; + << ", replica_id=" << _clone_req.replica_id; // try to download missing version from src backend. // if tablet on src backend does not contains missing version, it will download all versions, @@ -126,7 +127,6 @@ Status EngineCloneTask::_do_clone() { RETURN_IF_ERROR(_make_and_download_snapshots(*(tablet->data_dir()), local_data_path, &src_host, &src_file_path, missed_versions, &allow_incremental_clone)); - RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, _clone_req.committed_version, allow_incremental_clone)); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 5ed52287c4d6ce..61c8d4e6cfe451 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -1086,6 +1086,12 @@ private static void addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo throw new MetaNotFoundException("tablet[" + tabletId + "] does not exist"); } + // check replica id + long replicaId = backendTabletInfo.getReplicaId(); + if (replicaId <= 0) { + throw new MetaNotFoundException("replica id is invalid"); + } + long visibleVersion = partition.getVisibleVersion(); // check replica version @@ -1129,8 +1135,7 @@ private static void addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo } else if (version < partition.getCommittedVersion()) { lastFailedVersion = partition.getCommittedVersion(); } - - long replicaId = Env.getCurrentEnv().getNextId(); + // use replicaId reported by BE to maintain replica meta consistent between FE and BE Replica replica = new Replica(replicaId, backendId, version, schemaHash, dataSize, remoteDataSize, rowCount, ReplicaState.NORMAL, lastFailedVersion, version);