diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 067f734be5d515..15accf9eff0c88 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -2263,19 +2263,39 @@ AgentStatus TaskWorkerPool::_move_dir( return DORIS_TASK_REQUEST_ERROR; } - std::string dest_tablet_dir = tablet->construct_dir_path(); + std::string dest_tablet_dir = tablet->tablet_path(); std::string store_path = tablet->store()->path(); + // same as finish_clone() in OlapEngine, lock them all + tablet->obtain_base_compaction_lock(); + tablet->obtain_cumulative_lock(); + tablet->obtain_push_lock(); + tablet->obtain_header_wrlock(); SnapshotLoader loader(_env, job_id, tablet_id); Status status = loader.move(src, dest_tablet_dir, store_path, overwrite); + // unlock + tablet->release_header_lock(); + tablet->release_push_lock(); + tablet->release_cumulative_lock(); + tablet->release_base_compaction_lock(); if (!status.ok()) { - OLAP_LOG_WARNING("move failed. job id: %ld, msg: %s", - job_id, status.get_error_msg().c_str()); + LOG(WARNING) << "move failed. job id: " << job_id << ", msg: " << status.get_error_msg(); error_msgs->push_back(status.get_error_msg()); return DORIS_INTERNAL_ERROR; } + // reload tablet + OLAPStatus ost = OLAPEngine::get_instance()->load_one_tablet( + tablet->store(), tablet_id, schema_hash, dest_tablet_dir, true); + if (ost != OLAP_SUCCESS) { + std::stringstream ss; + ss << "failed to reload tablet: " << tablet_id; + LOG(WARNING) << ss.str(); + error_msgs->push_back(ss.str()); + return DORIS_INTERNAL_ERROR; + } + LOG(INFO) << "finished to reload tablet: " << tablet_id << " after move dir"; return DORIS_SUCCESS; } diff --git a/be/src/olap/olap_table.cpp b/be/src/olap/olap_table.cpp index 1b258809d75bcc..332bf69e0733ce 100644 --- a/be/src/olap/olap_table.cpp +++ b/be/src/olap/olap_table.cpp @@ -2054,10 +2054,6 @@ string OLAPTable::construct_file_name(const Version& version, return file_name; } -string OLAPTable::construct_dir_path() const { - return _tablet_path; -} - int32_t OLAPTable::get_field_index(const string& field_name) const { field_index_map_t::const_iterator res_iterator = _field_index_map.find(field_name); if (res_iterator == _field_index_map.end()) { diff --git a/be/src/olap/olap_table.h b/be/src/olap/olap_table.h index 96c060a6c740b0..ff2d514c26dc12 100644 --- a/be/src/olap/olap_table.h +++ b/be/src/olap/olap_table.h @@ -358,8 +358,6 @@ class OLAPTable : public std::enable_shared_from_this { int32_t segment_group_id, int32_t segment, const std::string& suffix) const; - std::string construct_dir_path() const; - // Return -1 if field name is invalid, else return field index in schema. int32_t get_field_index(const std::string& field_name) const; @@ -639,7 +637,7 @@ class OLAPTable : public std::enable_shared_from_this { return _storage_root_path; } - std::string tablet_path() { + std::string tablet_path() const { return _tablet_path; } diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index b2e3a0d5956782..0ff606d3850f13 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -485,6 +485,7 @@ Status SnapshotLoader::download( // If overwrite, just replace the tablet_path with snapshot_path, // else: (TODO) // +// MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock Status SnapshotLoader::move( const std::string& snapshot_path, const std::string& tablet_path, @@ -538,6 +539,18 @@ Status SnapshotLoader::move( std::vector snapshot_files; RETURN_IF_ERROR(_get_existing_files_from_local(snapshot_path, &snapshot_files)); + // 0. check all existing tablet files, revoke file if it is in GC queue + std::vector tablet_files; + RETURN_IF_ERROR(_get_existing_files_from_local(tablet_path, &tablet_files)); + std::vector files_to_check; + for (auto& snapshot_file : snapshot_files) { + if (std::find(tablet_files.begin(), tablet_files.end(), snapshot_file) != tablet_files.end()) { + std::string file_path = tablet_path + "/" + snapshot_file; + files_to_check.emplace_back(std::move(file_path)); + } + } + OLAPEngine::get_instance()->revoke_files_from_gc(files_to_check); + // 1. simply delete the old dir and replace it with the snapshot dir try { // This remove seems saft enough, because we already get @@ -555,12 +568,25 @@ Status SnapshotLoader::move( return Status(ss.str()); } - // copy files one by one + // link files one by one + // files in snapshot dir will be moved in snapshot clean process + std::vector linked_files; for (auto& file : snapshot_files) { std::string full_src_path = snapshot_path + "/" + file; std::string full_dest_path = tablet_path + "/" + file; - RETURN_IF_ERROR(FileUtils::copy_file(full_src_path, full_dest_path)); - VLOG(2) << "copy file from " << full_src_path<< " to " << full_dest_path; + if (link(full_src_path.c_str(), full_dest_path.c_str()) != 0) { + LOG(WARNING) << "failed to link file from " << full_src_path + << " to " << full_dest_path << ", err: " << std::strerror(errno); + + // clean the already linked files + for (auto& linked_file : linked_files) { + remove(linked_file.c_str()); + } + + return Status("move tablet failed"); + } + linked_files.push_back(full_dest_path); + VLOG(2) << "link file from " << full_src_path << " to " << full_dest_path; } } else { @@ -698,25 +724,7 @@ Status SnapshotLoader::move( } } - // fixme: there is no header now and can not call load_one_tablet here - // reload header - OlapStore* store = OLAPEngine::get_instance()->get_store(store_path); - if (store == nullptr) { - std::stringstream ss; - ss << "failed to get store by path: " << store_path; - LOG(WARNING) << ss.str(); - return Status(ss.str()); - } - OLAPStatus ost = OLAPEngine::get_instance()->load_one_tablet( - store, tablet_id, schema_hash, tablet_path, true); - if (ost != OLAP_SUCCESS) { - std::stringstream ss; - ss << "failed to reload header of tablet: " << tablet_id; - LOG(WARNING) << ss.str(); - return Status(ss.str()); - } - LOG(INFO) << "finished to reload header of tablet: " << tablet_id; - + LOG(INFO) << "finished to move tablet: " << tablet_id; return status; } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 1c1102500c60a8..5221139ed0a2ae 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -33,6 +33,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; @@ -283,6 +284,11 @@ public void commitTransaction(long dbId, long transactionId, List