Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2263,19 +2263,39 @@ AgentStatus TaskWorkerPool::_move_dir(
return DORIS_TASK_REQUEST_ERROR;
}

std::string dest_tablet_dir = tablet->construct_dir_path();
std::string dest_tablet_dir = tablet->tablet_path();
std::string store_path = tablet->store()->path();

// same as finish_clone() in OlapEngine, lock them all
tablet->obtain_base_compaction_lock();
tablet->obtain_cumulative_lock();
tablet->obtain_push_lock();
tablet->obtain_header_wrlock();
SnapshotLoader loader(_env, job_id, tablet_id);
Status status = loader.move(src, dest_tablet_dir, store_path, overwrite);
// unlock
tablet->release_header_lock();
tablet->release_push_lock();
tablet->release_cumulative_lock();
tablet->release_base_compaction_lock();

if (!status.ok()) {
OLAP_LOG_WARNING("move failed. job id: %ld, msg: %s",
job_id, status.get_error_msg().c_str());
LOG(WARNING) << "move failed. job id: " << job_id << ", msg: " << status.get_error_msg();
error_msgs->push_back(status.get_error_msg());
return DORIS_INTERNAL_ERROR;
}

// reload tablet
OLAPStatus ost = OLAPEngine::get_instance()->load_one_tablet(
tablet->store(), tablet_id, schema_hash, dest_tablet_dir, true);
if (ost != OLAP_SUCCESS) {
std::stringstream ss;
ss << "failed to reload tablet: " << tablet_id;
LOG(WARNING) << ss.str();
error_msgs->push_back(ss.str());
return DORIS_INTERNAL_ERROR;
}
LOG(INFO) << "finished to reload tablet: " << tablet_id << " after move dir";
return DORIS_SUCCESS;
}

Expand Down
4 changes: 0 additions & 4 deletions be/src/olap/olap_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2054,10 +2054,6 @@ string OLAPTable::construct_file_name(const Version& version,
return file_name;
}

string OLAPTable::construct_dir_path() const {
return _tablet_path;
}

int32_t OLAPTable::get_field_index(const string& field_name) const {
field_index_map_t::const_iterator res_iterator = _field_index_map.find(field_name);
if (res_iterator == _field_index_map.end()) {
Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/olap_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,6 @@ class OLAPTable : public std::enable_shared_from_this<OLAPTable> {
int32_t segment_group_id, int32_t segment,
const std::string& suffix) const;

std::string construct_dir_path() const;

// Return -1 if field name is invalid, else return field index in schema.
int32_t get_field_index(const std::string& field_name) const;

Expand Down Expand Up @@ -639,7 +637,7 @@ class OLAPTable : public std::enable_shared_from_this<OLAPTable> {
return _storage_root_path;
}

std::string tablet_path() {
std::string tablet_path() const {
return _tablet_path;
}

Expand Down
52 changes: 30 additions & 22 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ Status SnapshotLoader::download(
// If overwrite, just replace the tablet_path with snapshot_path,
// else: (TODO)
//
// MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock
Status SnapshotLoader::move(
const std::string& snapshot_path,
const std::string& tablet_path,
Expand Down Expand Up @@ -538,6 +539,18 @@ Status SnapshotLoader::move(
std::vector<std::string> snapshot_files;
RETURN_IF_ERROR(_get_existing_files_from_local(snapshot_path, &snapshot_files));

// 0. check all existing tablet files, revoke file if it is in GC queue
std::vector<std::string> tablet_files;
RETURN_IF_ERROR(_get_existing_files_from_local(tablet_path, &tablet_files));
std::vector<std::string> files_to_check;
for (auto& snapshot_file : snapshot_files) {
if (std::find(tablet_files.begin(), tablet_files.end(), snapshot_file) != tablet_files.end()) {
std::string file_path = tablet_path + "/" + snapshot_file;
files_to_check.emplace_back(std::move(file_path));
}
}
OLAPEngine::get_instance()->revoke_files_from_gc(files_to_check);

// 1. simply delete the old dir and replace it with the snapshot dir
try {
// This remove seems saft enough, because we already get
Expand All @@ -555,12 +568,25 @@ Status SnapshotLoader::move(
return Status(ss.str());
}

// copy files one by one
// link files one by one
// files in snapshot dir will be moved in snapshot clean process
std::vector<std::string> linked_files;
for (auto& file : snapshot_files) {
std::string full_src_path = snapshot_path + "/" + file;
std::string full_dest_path = tablet_path + "/" + file;
RETURN_IF_ERROR(FileUtils::copy_file(full_src_path, full_dest_path));
VLOG(2) << "copy file from " << full_src_path<< " to " << full_dest_path;
if (link(full_src_path.c_str(), full_dest_path.c_str()) != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this function return failed, who will clean the already linked files

get the error msg for link error

LOG(WARNING) << "failed to link file from " << full_src_path
<< " to " << full_dest_path << ", err: " << std::strerror(errno);

// clean the already linked files
for (auto& linked_file : linked_files) {
remove(linked_file.c_str());
}

return Status("move tablet failed");
}
linked_files.push_back(full_dest_path);
VLOG(2) << "link file from " << full_src_path << " to " << full_dest_path;
}

} else {
Expand Down Expand Up @@ -698,25 +724,7 @@ Status SnapshotLoader::move(
}
}

// fixme: there is no header now and can not call load_one_tablet here
// reload header
OlapStore* store = OLAPEngine::get_instance()->get_store(store_path);
if (store == nullptr) {
std::stringstream ss;
ss << "failed to get store by path: " << store_path;
LOG(WARNING) << ss.str();
return Status(ss.str());
}
OLAPStatus ost = OLAPEngine::get_instance()->load_one_tablet(
store, tablet_id, schema_hash, tablet_path, true);
if (ost != OLAP_SUCCESS) {
std::stringstream ss;
ss << "failed to reload header of tablet: " << tablet_id;
LOG(WARNING) << ss.str();
return Status(ss.str());
}
LOG(INFO) << "finished to reload header of tablet: " << tablet_id;

LOG(INFO) << "finished to move tablet: " << tablet_id;
return status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -283,6 +284,11 @@ public void commitTransaction(long dbId, long transactionId, List<TabletCommitIn
throw new MetaNotFoundException("could not find table for tablet [" + tabletId + "]");
}

if (tbl.getState() == OlapTableState.RESTORE) {
throw new LoadException("Table " + tbl.getName() + " is in restore process. "
+ "Can not load into it");
}

long partitionId = tabletInvertedIndex.getPartitionId(tabletId);
if (tbl.getPartition(partitionId) == null) {
throw new MetaNotFoundException("could not find partition for tablet [" + tabletId + "]");
Expand Down