Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
}

for (auto& thread : clean_old_file_threads) {
thread.join();
thread.detach();
}
}

Expand Down
16 changes: 15 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,21 @@ void Tablet::_print_missed_versions(const std::vector<Version>& missed_versions)
<< " next_id=" << _tablet_meta->get_cur_rowset_id();
return OLAP_ERR_ROWSET_INVALID;
}
Version version = {rowset->start_version(), rowset->end_version()};
RowsetSharedPtr exist_rs = get_rowset_by_version(version);
// if there exist a
if (exist_rs != nullptr && exist_rs->version_hash() == 0) {
vector<RowsetSharedPtr> to_add;
vector<RowsetSharedPtr> to_delete;
to_delete.push_back(exist_rs);
RETURN_NOT_OK(modify_rowsets(to_add, to_delete));
}

return OLAP_SUCCESS;
}
}

OLAPStatus Tablet::set_partition_id(int64_t partition_id) {
return _tablet_meta->set_partition_id(partition_id);
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
OLAPStatus next_rowset_id(RowsetId* id);
OLAPStatus set_next_rowset_id(RowsetId new_rowset_id);

OLAPStatus set_partition_id(int64_t partition_id);

private:
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
OLAPStatus _check_added_rowset(const RowsetSharedPtr& rowset);
Expand Down
13 changes: 12 additions & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ OLAPStatus TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) {
if (rs->rowset_id() != rs_meta->rowset_id()) {
LOG(WARNING) << "version already exist. rowset_id=" << rs->rowset_id()
<< " start_version=" << rs_meta->start_version()
<< ", end_version=" << rs_meta->end_version();
<< ", end_version=" << rs_meta->end_version()
<< ", tablet=" << full_name();
return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
} else {
// rowsetid,version is equal, it is a duplicate req, skip it
Expand Down Expand Up @@ -757,4 +758,14 @@ std::string TabletMeta::full_name() const {
return ss.str();
}

OLAPStatus TabletMeta::set_partition_id(int64_t partition_id) {
if ((_partition_id > 0 && _partition_id != partition_id) || partition_id < 1) {
LOG(FATAL) << "cur partition id=" << _partition_id
<< " new partition id=" << partition_id
<< " not equal";
}
_partition_id = partition_id;
return OLAP_SUCCESS;
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ class TabletMeta {

std::string full_name() const;

OLAPStatus set_partition_id(int64_t partition_id);

private:
OLAPStatus _save_meta(DataDir* data_dir);

Expand Down
52 changes: 32 additions & 20 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,37 @@ OLAPStatus EngineCloneTask::execute() {
<< ", schema_hash:" << _clone_req.schema_hash
<< ", committed_version:" << _clone_req.committed_version;

// try to incremental clone
vector<Version> missed_versions;
tablet->calc_missed_versions(_clone_req.committed_version, &missed_versions);
LOG(INFO) << "finish to calculate missed versions when clone. "
<< "tablet=" << tablet->full_name()
<< ", committed_version=" << _clone_req.committed_version
<< ", missed_versions_size=" << missed_versions.size();
// if missed version size is 0, then it is useless to clone from remote be, it means local data is
// completed. Or remote be will just return header not the rowset files. clone will failed.
if (missed_versions.size() == 0) {
LOG(INFO) << "missed version size = 0, skip clone and reture success";
return OLAP_SUCCESS;
}
// get download path
string local_data_path = tablet->tablet_path() + CLONE_PREFIX;

bool allow_incremental_clone = false;
status = _clone_copy(*(tablet->data_dir()), _clone_req, _signature, local_data_path,
&src_host, &src_file_path, _error_msgs,
&missed_versions,
&allow_incremental_clone,
tablet);
// check if current tablet has version == 2 and version hash == 0
// version 2 may be an invalid rowset
Version version_2 = {2, 2};
RowsetSharedPtr rowset_version2 = tablet->get_rowset_by_version(version_2);
if (rowset_version2 == nullptr || rowset_version2->version_hash() != 0) {
// try to incremental clone
vector<Version> missed_versions;
tablet->calc_missed_versions(_clone_req.committed_version, &missed_versions);
LOG(INFO) << "finish to calculate missed versions when clone. "
<< "tablet=" << tablet->full_name()
<< ", committed_version=" << _clone_req.committed_version
<< ", missed_versions_size=" << missed_versions.size();
// if missed version size is 0, then it is useless to clone from remote be, it means local data is
// completed. Or remote be will just return header not the rowset files. clone will failed.
if (missed_versions.size() == 0) {
LOG(INFO) << "missed version size = 0, skip clone and return success";
_set_tablet_info(DORIS_SUCCESS, is_new_tablet);
return OLAP_SUCCESS;
}
status = _clone_copy(*(tablet->data_dir()), _clone_req, _signature, local_data_path,
&src_host, &src_file_path, _error_msgs,
&missed_versions,
&allow_incremental_clone,
tablet);
} else {
LOG(INFO) << "current tablet has invalid rowset with version == 2 and version_hash == 0, try to full clone"
<< " tablet info = " << tablet->full_name();
}
if (status == DORIS_SUCCESS && allow_incremental_clone) {
OLAPStatus olap_status = _finish_clone(tablet, local_data_path, _clone_req.committed_version, allow_incremental_clone);
if (olap_status != OLAP_SUCCESS) {
Expand Down Expand Up @@ -209,7 +218,11 @@ OLAPStatus EngineCloneTask::execute() {
}
}
}
_set_tablet_info(status, is_new_tablet);
return OLAP_SUCCESS;
}

void EngineCloneTask::_set_tablet_info(AgentStatus status, bool is_new_tablet) {
// Get clone tablet info
if (status == DORIS_SUCCESS || status == DORIS_CREATE_TABLE_EXIST) {
TTabletInfo tablet_info;
Expand Down Expand Up @@ -267,7 +280,6 @@ OLAPStatus EngineCloneTask::execute() {
}
}
*_res_status = status;
return OLAP_SUCCESS;
}

AgentStatus EngineCloneTask::_clone_copy(
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/task/engine_clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class EngineCloneTask : public EngineTask {

OLAPStatus _convert_to_new_snapshot(DataDir& data_dir, const string& clone_dir, int64_t tablet_id);

void _set_tablet_info(AgentStatus status, bool is_new_tablet);

private:
const TCloneReq& _clone_req;
vector<string>* _error_msgs;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ OLAPStatus EnginePublishVersionTask::finish() {
if (publish_status != OLAP_SUCCESS) {
LOG(WARNING) << "add visible rowset to tablet failed rowset_id:" << rowset->rowset_id()
<< "tablet id: " << tablet_info.tablet_id
<< "txn id:" << transaction_id;
<< "txn id:" << transaction_id
<< "res:" << publish_status;
_error_tablet_ids->push_back(tablet_info.tablet_id);
res = publish_status;
continue;
Expand Down