Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ OLAPStatus DataDir::load() {
// there should be only preparing rowset in meta env because visible
// rowset is persist with tablet meta currently
OLAPStatus publish_status = tablet->add_inc_rowset(rowset);
if (publish_status != OLAP_SUCCESS) {
if (publish_status != OLAP_SUCCESS && publish_status != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "add visilbe rowset to tablet failed rowset_id:" << rowset->rowset_id()
<< " tablet id: " << rowset_meta->tablet_id()
<< " txn id:" << rowset_meta->txn_id()
Expand Down
85 changes: 50 additions & 35 deletions be/src/olap/rowset/segment_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,23 +698,29 @@ OLAPStatus SegmentGroup::make_snapshot(const std::string& snapshot_path,
}
for (int segment_id = 0; segment_id < _num_segments; segment_id++) {
std::string snapshot_data_file_name = construct_data_file_path(snapshot_path, segment_id);
if (!check_dir_existed(snapshot_data_file_name)) {
std::string cur_data_file_name = construct_data_file_path(segment_id);
if (link(cur_data_file_name.c_str(), snapshot_data_file_name.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << cur_data_file_name << ", "
<< "to=" << snapshot_data_file_name << ", " << "errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
}
if (check_dir_existed(snapshot_data_file_name)) {
LOG(WARNING) << "snapshot dest file already exist, fail to make snapshot."
<< " file=" << snapshot_data_file_name;
return OLAP_ERR_FILE_ALREADY_EXIST;
}
std::string cur_data_file_name = construct_data_file_path(segment_id);
if (link(cur_data_file_name.c_str(), snapshot_data_file_name.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << cur_data_file_name << ", "
<< "to=" << snapshot_data_file_name << ", " << "errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
}
success_links->push_back(snapshot_data_file_name);
std::string snapshot_index_file_name = construct_index_file_path(snapshot_path, segment_id);
if (!check_dir_existed(snapshot_index_file_name)) {
std::string cur_index_file_name = construct_index_file_path(segment_id);
if (link(cur_index_file_name.c_str(), snapshot_index_file_name.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << cur_index_file_name << ", "
<< "to=" << snapshot_index_file_name << ", " << "errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
}
if (check_dir_existed(snapshot_index_file_name)) {
LOG(WARNING) << "snapshot dest file already exist, fail to make snapshot."
<< " file=" << snapshot_index_file_name;
return OLAP_ERR_FILE_ALREADY_EXIST;
}
std::string cur_index_file_name = construct_index_file_path(segment_id);
if (link(cur_index_file_name.c_str(), snapshot_index_file_name.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << cur_index_file_name << ", "
<< "to=" << snapshot_index_file_name << ", " << "errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
}
success_links->push_back(snapshot_index_file_name);
}
Expand Down Expand Up @@ -757,7 +763,10 @@ OLAPStatus SegmentGroup::copy_files_to_path(const std::string& dest_path,
return OLAP_SUCCESS;
}


// when convert from old files, remove existing files
// convert from old files in 2 cases:
// case 1: clone from old version be
// case 2: upgrade to new version be
OLAPStatus SegmentGroup::convert_from_old_files(const std::string& snapshot_path,
std::vector<std::string>* success_links) {
if (_empty) {
Expand All @@ -766,29 +775,35 @@ OLAPStatus SegmentGroup::convert_from_old_files(const std::string& snapshot_path
}
for (int segment_id = 0; segment_id < _num_segments; segment_id++) {
std::string new_data_file_name = construct_data_file_path(_rowset_path_prefix, segment_id);
if (!check_dir_existed(new_data_file_name)) {
std::string old_data_file_name = construct_old_data_file_path(snapshot_path, segment_id);
if (link(old_data_file_name.c_str(), new_data_file_name.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << old_data_file_name << ", "
<< "to=" << new_data_file_name << ", " << "errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
} else {
VLOG(3) << "link data file from " << old_data_file_name
<< " to " << new_data_file_name << " successfully";
}
// if file exist should remove it because same file name does not mean same data
if (check_dir_existed(new_data_file_name)) {
LOG(INFO) << "file already exist, remove it. file=" << new_data_file_name;
RETURN_NOT_OK(remove_dir(new_data_file_name));
}
std::string old_data_file_name = construct_old_data_file_path(snapshot_path, segment_id);
if (link(old_data_file_name.c_str(), new_data_file_name.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << old_data_file_name
<< ", to=" << new_data_file_name << ", errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
} else {
VLOG(3) << "link data file from " << old_data_file_name
<< " to " << new_data_file_name << " successfully";
}
success_links->push_back(new_data_file_name);
std::string new_index_file_name = construct_index_file_path(_rowset_path_prefix, segment_id);
if (!check_dir_existed(new_index_file_name)) {
std::string old_index_file_name = construct_old_index_file_path(snapshot_path, segment_id);
if (link(old_index_file_name.c_str(), new_index_file_name.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << old_index_file_name << ", "
<< "to=" << new_index_file_name << ", " << "errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
} else {
VLOG(3) << "link index file from " << old_index_file_name
<< " to " << new_index_file_name << " successfully";
}
if (check_dir_existed(new_index_file_name)) {
LOG(INFO) << "file already exist, remove it. file=" << new_index_file_name;
RETURN_NOT_OK(remove_dir(new_index_file_name));
}
std::string old_index_file_name = construct_old_index_file_path(snapshot_path, segment_id);
if (link(old_index_file_name.c_str(), new_index_file_name.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << old_index_file_name
<< ", to=" << new_index_file_name
<< ", errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
} else {
VLOG(3) << "link index file from " << old_index_file_name
<< " to " << new_index_file_name << " successfully";
}
success_links->push_back(new_index_file_name);
}
Expand Down
23 changes: 22 additions & 1 deletion be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,34 @@ OLAPStatus SnapshotManager::_create_snapshot_files(
new_tablet_meta->revise_rs_metas(rs_metas);
}
if (snapshot_version < PREFERRED_SNAPSHOT_VERSION) {
set<string> exist_old_files;
if ((res = dir_walk(schema_full_path, nullptr, &exist_old_files)) != OLAP_SUCCESS) {
LOG(WARNING) << "failed to dir walk when convert old files. dir="
<< schema_full_path;
break;
}
OlapSnapshotConverter converter;
TabletMetaPB tablet_meta_pb;
OLAPHeaderMessage olap_header_msg;
new_tablet_meta->to_meta_pb(&tablet_meta_pb);
converter.to_old_snapshot(tablet_meta_pb, schema_full_path, schema_full_path, &olap_header_msg);
res = converter.to_old_snapshot(tablet_meta_pb, schema_full_path, schema_full_path, &olap_header_msg);
if (res != OLAP_SUCCESS) {
break;
}
// convert new version files to old version files successuflly, then should remove the old files
vector<string> files_to_delete;
for (auto file_name : exist_old_files) {
string full_file_path = schema_full_path + "/" + file_name;
files_to_delete.push_back(full_file_path);
}
// remove all files
res = remove_files(files_to_delete);
if (res != OLAP_SUCCESS) {
break;
}
// save new header to snapshot header path
res = converter.save(header_path, olap_header_msg);
LOG(INFO) << "finished convert new snapshot to old snapshot, res=" << res;
} else {
res = new_tablet_meta->save(header_path);
}
Expand Down
21 changes: 20 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,14 +899,33 @@ void Tablet::_print_missed_versions(const std::vector<Version>& missed_versions)
}
Version version = {rowset->start_version(), rowset->end_version()};
RowsetSharedPtr exist_rs = get_rowset_by_version(version);
// if there exist a
// if there exist a rowset with version_hash == 0, should delete it
if (exist_rs != nullptr && exist_rs->version_hash() == 0) {
vector<RowsetSharedPtr> to_add;
vector<RowsetSharedPtr> to_delete;
to_delete.push_back(exist_rs);
RETURN_NOT_OK(modify_rowsets(to_add, to_delete));
}

// check if there exist a rowset contains the added rowset
for (auto& it : _rs_version_map) {
if (it.first.first <= rowset->start_version()
&& it.first.second >= rowset->end_version()) {
if (it.second == nullptr) {
LOG(FATAL) << "there exist a version "
<< " start_version=" << it.first.first
<< " end_version=" << it.first.second
<< " contains the input rs with version "
<< " start_version=" << rowset->start_version()
<< " end_version=" << rowset->end_version()
<< " but the related rs is null";
return OLAP_ERR_PUSH_ROWSET_NOT_FOUND;
} else {
return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
}
}
}

return OLAP_SUCCESS;
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ AgentStatus EngineCloneTask::_clone_copy(
bool* allow_incremental_clone,
TabletSharedPtr tablet) {
AgentStatus status = DORIS_SUCCESS;

std::string token = _master_info.token;
for (auto src_backend : clone_req.src_backends) {
stringstream http_host_stream;
Expand Down Expand Up @@ -371,6 +370,9 @@ AgentStatus EngineCloneTask::_clone_copy(
// Check local path exist, if exist, remove it, then create the dir
// local_file_full_path = tabletid/clone, for a specific tablet, there should be only one folder
// if this folder exists, then should remove it
// for example, BE clone from BE 1 to download file 1 with version (2,2), but clone from BE 1 failed
// then it will try to clone from BE 2, but it will find the file 1 already exist, but file 1 with same
// name may have different versions.
if (status == DORIS_SUCCESS) {
boost::filesystem::path local_file_full_dir(local_file_full_path);
if (boost::filesystem::exists(local_file_full_dir)) {
Expand Down Expand Up @@ -597,7 +599,7 @@ OLAPStatus EngineCloneTask::_convert_to_new_snapshot(DataDir& data_dir, const st
files_to_delete.push_back(full_file_path);
}
// remove all files
remove_files(files_to_delete);
RETURN_NOT_OK(remove_files(files_to_delete));

res = TabletMeta::save(cloned_meta_file, tablet_meta_pb);
if (res != OLAP_SUCCESS) {
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ OLAPStatus EnginePublishVersionTask::finish() {
}
// add visible rowset to tablet
publish_status = tablet->add_inc_rowset(rowset);
if (publish_status != OLAP_SUCCESS) {
if (publish_status != OLAP_SUCCESS && publish_status != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "add visible rowset to tablet failed rowset_id:" << rowset->rowset_id()
<< "tablet id: " << tablet_info.tablet_id
<< "txn id:" << transaction_id
Expand All @@ -104,9 +104,10 @@ OLAPStatus EnginePublishVersionTask::finish() {
res = publish_status;
continue;
}
if (publish_status == OLAP_SUCCESS) {
if (publish_status == OLAP_SUCCESS || publish_status == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(INFO) << "publish version successfully on tablet. tablet=" << tablet->full_name()
<< ", transaction_id=" << transaction_id << ", version=" << version.first;
<< ", transaction_id=" << transaction_id << ", version=" << version.first
<< ", res=" << publish_status;
// delete rowset from meta env, because add inc rowset alreay saved the rowset meta to tablet meta
RowsetMetaManager::remove(tablet->data_dir()->get_meta(), tablet->tablet_uid(), rowset->rowset_id());
// delete txn info
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,8 @@ OLAPStatus copy_dir(const string &src_dir, const string &dst_dir) {
return OLAP_SUCCESS;
}

void remove_files(const vector<string>& files) {
OLAPStatus remove_files(const vector<string>& files) {
OLAPStatus res = OLAP_SUCCESS;
for (const string& file : files) {
boost::filesystem::path file_path(file);

Expand All @@ -1343,11 +1344,13 @@ void remove_files(const vector<string>& files) {
} else {
OLAP_LOG_WARNING("failed to remove file. [file=%s errno=%d]",
file.c_str(), Errno::no());
res = OLAP_ERR_IO_ERROR;
}
} catch (...) {
// do nothing
}
}
return res;
}

// failed when there are files or dirs under thr dir
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ OLAPStatus create_dirs(const std::string& path);

OLAPStatus copy_dir(const std::string &src_dir, const std::string &dst_dir);

void remove_files(const std::vector<std::string>& files);
OLAPStatus remove_files(const std::vector<std::string>& files);

OLAPStatus remove_dir(const std::string& path);

Expand Down