Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatur

std::atomic_ulong s_report_version(time(nullptr) * 10000);

void increase_report_version() {
s_report_version.fetch_add(1, std::memory_order_relaxed);
}

// FIXME(plat1ko): Paired register and remove task info
bool register_task_info(const TTaskType::type task_type, int64_t signature) {
if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
Expand Down Expand Up @@ -197,7 +201,7 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req
}

if (status.ok()) {
s_report_version.fetch_add(1, std::memory_order_relaxed);
increase_report_version();
}

// Return result to fe
Expand Down Expand Up @@ -1381,7 +1385,7 @@ void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req)
.tag("tablet_id", create_tablet_req.tablet_id)
.error(status);
} else {
s_report_version.fetch_add(1, std::memory_order_relaxed);
increase_report_version();
// get path hash of the created tablet
TabletSharedPtr tablet;
{
Expand Down Expand Up @@ -1476,7 +1480,7 @@ void push_callback(const TAgentTaskRequest& req) {
.tag("signature", req.signature)
.tag("tablet_id", push_req.tablet_id)
.tag("push_type", push_req.push_type);
++s_report_version;
increase_report_version();
finish_task_request.__set_finish_tablet_infos(tablet_infos);
} else {
LOG_WARNING("failed to execute push task")
Expand Down Expand Up @@ -1743,6 +1747,10 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
LOG_INFO("successfully clone tablet")
.tag("signature", req.signature)
.tag("tablet_id", clone_req.tablet_id);
if (engine_task.is_new_tablet()) {
increase_report_version();
finish_task_request.__set_report_version(s_report_version);
}
finish_task_request.__set_finish_tablet_infos(tablet_infos);
}

Expand Down
11 changes: 6 additions & 5 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ Status EngineCloneTask::_do_clone() {
tablet->tablet_id(), tablet->replica_id(), false));
tablet.reset();
}
bool is_new_tablet = tablet == nullptr;
_is_new_tablet = tablet == nullptr;
// try to incremental clone
std::vector<Version> missed_versions;
// try to repair a tablet with missing version
Expand Down Expand Up @@ -229,7 +229,7 @@ Status EngineCloneTask::_do_clone() {
if (missed_versions.empty()) {
LOG(INFO) << "missed version size = 0, skip clone and return success. tablet_id="
<< _clone_req.tablet_id << " replica_id=" << _clone_req.replica_id;
RETURN_IF_ERROR(_set_tablet_info(is_new_tablet));
RETURN_IF_ERROR(_set_tablet_info());
return Status::OK();
}

Expand Down Expand Up @@ -308,10 +308,11 @@ Status EngineCloneTask::_do_clone() {
TabletMeta::construct_header_file_path(tablet_dir, _clone_req.tablet_id);
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(header_path));
}
return _set_tablet_info(is_new_tablet);

return _set_tablet_info();
}

Status EngineCloneTask::_set_tablet_info(bool is_new_tablet) {
Status EngineCloneTask::_set_tablet_info() {
// Get clone tablet info
TTabletInfo tablet_info;
tablet_info.__set_tablet_id(_clone_req.tablet_id);
Expand All @@ -321,7 +322,7 @@ Status EngineCloneTask::_set_tablet_info(bool is_new_tablet) {
if (_clone_req.__isset.version && tablet_info.version < _clone_req.version) {
// if it is a new tablet and clone failed, then remove the tablet
// if it is incremental clone, then must not drop the tablet
if (is_new_tablet) {
if (_is_new_tablet) {
// we need to check if this cloned table's version is what we expect.
// if not, maybe this is a stale remaining table which is waiting for drop.
// we drop it.
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/task/engine_clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class EngineCloneTask : public EngineTask {
vector<TTabletInfo>* tablet_infos);
~EngineCloneTask() {}

bool is_new_tablet() const { return _is_new_tablet; }

private:
Status _do_clone();

Expand All @@ -72,7 +74,7 @@ class EngineCloneTask : public EngineTask {
const vector<Version>& missing_versions,
bool* allow_incremental_clone);

Status _set_tablet_info(bool is_new_tablet);
Status _set_tablet_info();

// Download tablet files from
Status _download_files(DataDir* data_dir, const std::string& remote_url_prefix,
Expand All @@ -95,6 +97,7 @@ class EngineCloneTask : public EngineTask {
int64_t _copy_size;
int64_t _copy_time_ms;
std::vector<PendingRowsetGuard> _pending_rs_guards;
bool _is_new_tablet = false;
}; // EngineTask

} // namespace doris
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,11 @@ private void finishDropReplica(AgentTask task) {
private void finishClone(AgentTask task, TFinishTaskRequest request) {
CloneTask cloneTask = (CloneTask) task;
if (cloneTask.getTaskVersion() == CloneTask.VERSION_2) {
if (request.isSetReportVersion()) {
long reportVersion = request.getReportVersion();
Env.getCurrentSystemInfo().updateBackendReportVersion(
task.getBackendId(), reportVersion, task.getDbId(), task.getTableId());
}
Env.getCurrentEnv().getTabletScheduler().finishCloneTask(cloneTask, request);
} else {
LOG.warn("invalid clone task, ignore it. {}", task);
Expand Down