Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id,
auto& shard = _get_tablets_shard(tablet_id);
std::lock_guard wrlock(shard.lock);
if (shard.tablets_under_clone.count(tablet_id) > 0) {
LOG(INFO) << "tablet " << tablet_id << " is under clone, skip drop task";
return Status::Aborted("aborted");
return Status::Aborted("tablet {} is under clone, skip drop task", tablet_id);
}
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition);
Expand All @@ -467,12 +466,9 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
// We should compare replica id to avoid dropping new cloned tablet.
// Iff request replica id is 0, FE may be an older release, then we drop this tablet as before.
if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) {
LOG(WARNING) << "fail to drop tablet because replica id not match. "
<< "tablet_id=" << tablet_id << ", replica_id=" << to_drop_tablet->replica_id()
<< ", request replica_id=" << replica_id;
return Status::Aborted("aborted");
return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(),
replica_id);
}

_remove_tablet_from_partition(to_drop_tablet);
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map.erase(tablet_id);
Expand Down Expand Up @@ -1025,10 +1021,10 @@ Status TabletManager::start_trash_sweep() {
return Status::OK();
} // start_trash_sweep

void TabletManager::register_clone_tablet(int64_t tablet_id) {
bool TabletManager::register_clone_tablet(int64_t tablet_id) {
tablets_shard& shard = _get_tablets_shard(tablet_id);
std::lock_guard<std::shared_mutex> wrlock(shard.lock);
shard.tablets_under_clone.insert(tablet_id);
return shard.tablets_under_clone.insert(tablet_id).second;
}

void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ class TabletManager {

void obtain_specific_quantity_tablets(std::vector<TabletInfo>& tablets_info, int64_t num);

void register_clone_tablet(int64_t tablet_id);
// return `true` if register success
bool register_clone_tablet(int64_t tablet_id);
void unregister_clone_tablet(int64_t tablet_id);

void get_tablets_distribution_on_different_disks(
Expand Down
24 changes: 12 additions & 12 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
Status EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone process
SCOPED_ATTACH_TASK(_mem_tracker);
StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id);
if (StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
Status st = _do_clone();
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
return st;
Expand All @@ -86,6 +88,13 @@ Status EngineCloneTask::_do_clone() {
std::vector<Version> missed_versions;
// try to repair a tablet with missing version
if (tablet != nullptr) {
if (tablet->replica_id() != _clone_req.replica_id) {
// `tablet` may be a dropped replica in FE, e.g: BE1 migrates replica of tablet_1 to BE2,
// but before BE1 drop this replica, another new replica of tablet_1 is migrated to BE1.
// If we allow to clone success on dropped replica, replica id may never be consistent between FE and BE.
return Status::InternalError("replica_id not match({} vs {})", tablet->replica_id(),
_clone_req.replica_id);
}
std::shared_lock migration_rlock(tablet->get_migration_lock(), std::try_to_lock);
if (!migration_rlock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>();
Expand All @@ -101,15 +110,7 @@ Status EngineCloneTask::_do_clone() {
// completed. Or remote be will just return header not the rowset files. clone will failed.
if (missed_versions.empty()) {
LOG(INFO) << "missed version size = 0, skip clone and return success. tablet_id="
<< _clone_req.tablet_id << " req replica=" << _clone_req.replica_id;
if (_clone_req.replica_id != tablet->replica_id()) {
// update replica id to meet cooldown replica
tablet->tablet_meta()->set_replica_id(_clone_req.replica_id);
{
std::shared_lock rlock(tablet->get_header_lock());
tablet->save_meta();
}
}
<< _clone_req.tablet_id << " replica_id=" << _clone_req.replica_id;
_set_tablet_info(is_new_tablet);
return Status::OK();
}
Expand All @@ -118,15 +119,14 @@ Status EngineCloneTask::_do_clone() {
<< ", allow_incremental_clone=" << allow_incremental_clone
<< ", signature=" << _signature << ", tablet_id=" << _clone_req.tablet_id
<< ", committed_version=" << _clone_req.committed_version
<< ", req replica=" << _clone_req.replica_id;
<< ", replica_id=" << _clone_req.replica_id;

// try to download missing version from src backend.
// if tablet on src backend does not contains missing version, it will download all versions,
// and set allow_incremental_clone to false
RETURN_IF_ERROR(_make_and_download_snapshots(*(tablet->data_dir()), local_data_path,
&src_host, &src_file_path, missed_versions,
&allow_incremental_clone));

RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, _clone_req.committed_version,
allow_incremental_clone));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,12 @@ private static void addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo
throw new MetaNotFoundException("tablet[" + tabletId + "] does not exist");
}

// check replica id
long replicaId = backendTabletInfo.getReplicaId();
if (replicaId <= 0) {
throw new MetaNotFoundException("replica id is invalid");
}

long visibleVersion = partition.getVisibleVersion();

// check replica version
Expand Down Expand Up @@ -1129,8 +1135,7 @@ private static void addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo
} else if (version < partition.getCommittedVersion()) {
lastFailedVersion = partition.getCommittedVersion();
}

long replicaId = Env.getCurrentEnv().getNextId();
// use replicaId reported by BE to maintain replica meta consistent between FE and BE
Replica replica = new Replica(replicaId, backendId, version, schemaHash,
dataSize, remoteDataSize, rowCount, ReplicaState.NORMAL,
lastFailedVersion, version);
Expand Down