Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 21 additions & 30 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ OLAPStatus DataDir::_convert_old_tablet() {
bool parsed = olap_header_msg.ParseFromString(value);
if (!parsed) {
LOG(FATAL) << "convert olap header to tablet meta failed when load olap header tablet="
<< tablet_id << "." << schema_hash;
<< tablet_id << "." << schema_hash;
return false;
}
string old_data_path_prefix = get_absolute_tablet_path(olap_header_msg, true);
Expand Down Expand Up @@ -612,16 +612,10 @@ OLAPStatus DataDir::_convert_old_tablet() {
return OLAP_SUCCESS;
}

OLAPStatus DataDir::_remove_old_meta_and_files(const std::set<int64_t>& tablet_ids) {
OLAPStatus DataDir::remove_old_meta_and_files() {
// clean old meta(olap header message)
auto clean_old_meta_func = [this, &tablet_ids](int64_t tablet_id,
auto clean_old_meta_func = [this](int64_t tablet_id,
int32_t schema_hash, const std::string& value) -> bool {
if (tablet_ids.find(tablet_id) == tablet_ids.end()) {
LOG(WARNING) << "tablet not load successfully, skip clean meta for tablet="
<< tablet_id << "." << schema_hash
<< " from data dir: " << _path;
return true;
}
TabletMetaManager::remove(this, tablet_id, schema_hash, OLD_HEADER_PREFIX);
LOG(INFO) << "successfully clean old tablet meta(olap header) for tablet="
<< tablet_id << "." << schema_hash
Expand All @@ -638,14 +632,8 @@ OLAPStatus DataDir::_remove_old_meta_and_files(const std::set<int64_t>& tablet_i
}

// clean old files because they have hard links in new file name format
auto clean_old_files_func = [this, &tablet_ids](int64_t tablet_id,
auto clean_old_files_func = [this](int64_t tablet_id,
int32_t schema_hash, const std::string& value) -> bool {
if (tablet_ids.find(tablet_id) == tablet_ids.end()) {
LOG(WARNING) << "tablet not load successfully, skip clean files for tablet="
<< tablet_id << "." << schema_hash
<< " from data dir: " << _path;
return true;
}
TabletMetaPB tablet_meta_pb;
bool parsed = tablet_meta_pb.ParseFromString(value);
if (!parsed) {
Expand Down Expand Up @@ -697,6 +685,19 @@ OLAPStatus DataDir::_remove_old_meta_and_files(const std::set<int64_t>& tablet_i
return OLAP_SUCCESS;
}

bool DataDir::convert_old_data_success() {
return _convert_old_data_success;
}

OLAPStatus DataDir::set_convert_finished() {
OLAPStatus res = _meta->set_tablet_convert_finished();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "save convert flag failed after convert old tablet. dir=" << _path;
return res;
}
return OLAP_SUCCESS;
}

// TODO(ygl): deal with rowsets and tablets when load failed
OLAPStatus DataDir::load() {
// check if this is an old data path
Expand All @@ -706,26 +707,19 @@ OLAPStatus DataDir::load() {
LOG(WARNING) << "get convert flag from meta failed dir=" << _path;
return res;
}
bool should_remove_old_files = false;
_convert_old_data_success = false;
if (!is_tablet_convert_finished) {
_clean_unfinished_converting_data();
res = _convert_old_tablet();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "convert old tablet failed for dir = " << _path;
return res;
}
// TODO(ygl): should load tablet successfully and then set convert flag and clean old files
res = _meta->set_tablet_convert_finished();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "save convert flag failed after convert old tablet. dir=" << _path;
return res;
}
// convert may be successfully, but crashed before remove old files
// depend on gc thread to recycle the old files
// _remove_old_meta_and_files(data_dir);
should_remove_old_files = true;

_convert_old_data_success = true;
} else {
LOG(INFO) << "tablets have been converted, skip convert process";
_convert_old_data_success = true;
}

LOG(INFO) << "start to load tablets from " << _path;
Expand Down Expand Up @@ -853,9 +847,6 @@ OLAPStatus DataDir::load() {
<< " current valid tablet uid: " << tablet->tablet_uid();
}
}
if (should_remove_old_files) {
_remove_old_meta_and_files(tablet_ids);
}
return OLAP_SUCCESS;
}

Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ class DataDir {
// this function will collect garbage paths scaned by last function
void perform_path_gc();


OLAPStatus remove_old_meta_and_files();

bool convert_old_data_success();

OLAPStatus set_convert_finished();

private:
std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
Status _init_cluster_id();
Expand All @@ -127,7 +134,6 @@ class DataDir {
Status _write_cluster_id_to_path(const std::string& path, int32_t cluster_id);
OLAPStatus _clean_unfinished_converting_data();
OLAPStatus _convert_old_tablet();
OLAPStatus _remove_old_meta_and_files(const std::set<int64_t>& tablet_ids);

void _remove_check_paths_no_lock(const std::set<std::string>& paths);

Expand Down Expand Up @@ -172,6 +178,9 @@ class DataDir {

std::set<std::string> _pending_path_ids;
RWMutex _pending_path_mutex;

// used in convert process
bool _convert_old_data_success;
};

} // namespace doris
58 changes: 54 additions & 4 deletions be/src/olap/rowset/segment_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,11 +797,28 @@ OLAPStatus SegmentGroup::convert_to_old_files(const std::string& snapshot_path,
OLAPStatus SegmentGroup::remove_old_files(std::vector<std::string>* links_to_remove) {
for (int segment_id = 0; segment_id < _num_segments; segment_id++) {
std::string old_data_file_name = construct_old_data_file_path(_rowset_path_prefix, segment_id);
RETURN_NOT_OK(remove_dir(old_data_file_name));
links_to_remove->push_back(old_data_file_name);
if (check_dir_existed(old_data_file_name)) {
RETURN_NOT_OK(remove_dir(old_data_file_name));
links_to_remove->push_back(old_data_file_name);
}
std::string old_index_file_name = construct_old_index_file_path(_rowset_path_prefix, segment_id);
RETURN_NOT_OK(remove_dir(old_index_file_name));
links_to_remove->push_back(old_index_file_name);
if (check_dir_existed(old_index_file_name)) {
RETURN_NOT_OK(remove_dir(old_index_file_name));
links_to_remove->push_back(old_index_file_name);
}
// if segment group id == 0, it maybe convert from old files which do not have segment group id in file path
if (_segment_group_id == 0) {
old_data_file_name = _construct_err_sg_data_file_path(_rowset_path_prefix, segment_id);
if (check_dir_existed(old_data_file_name)) {
RETURN_NOT_OK(remove_dir(old_data_file_name));
links_to_remove->push_back(old_data_file_name);
}
old_index_file_name = _construct_err_sg_index_file_path(_rowset_path_prefix, segment_id);
if (check_dir_existed(old_index_file_name)) {
RETURN_NOT_OK(remove_dir(old_index_file_name));
links_to_remove->push_back(old_index_file_name);
}
}
}
std::string pending_delta_path = _rowset_path_prefix + PENDING_DELTA_PREFIX;
if (check_dir_existed(pending_delta_path)) {
Expand Down Expand Up @@ -857,6 +874,22 @@ std::string SegmentGroup::construct_old_data_file_path(const std::string& path_p
}
}

std::string SegmentGroup::_construct_err_sg_index_file_path(const std::string& path_prefix, int32_t segment_id) const {
if (_is_pending) {
return _construct_old_pending_file_path(path_prefix, segment_id, ".idx");
} else {
return _construct_err_sg_file_path(path_prefix, segment_id, ".idx");
}
}

std::string SegmentGroup::_construct_err_sg_data_file_path(const std::string& path_prefix, int32_t segment_id) const {
if (_is_pending) {
return _construct_old_pending_file_path(path_prefix, segment_id, ".dat");
} else {
return _construct_err_sg_file_path(path_prefix, segment_id, ".dat");
}
}

std::string SegmentGroup::_construct_old_pending_file_path(const std::string& path_prefix, int32_t segment_id,
const std::string& suffix) const {
std::stringstream file_path;
Expand Down Expand Up @@ -895,4 +928,21 @@ std::string SegmentGroup::_construct_old_file_path(const std::string& path_prefi
return file_path;
}

// construct file path for sg_id == -1
std::string SegmentGroup::_construct_err_sg_file_path(const std::string& path_prefix, int32_t segment_id, const std::string& suffix) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this function is called err?

char file_path[OLAP_MAX_PATH_LEN];
snprintf(file_path,
sizeof(file_path),
"%s/%ld_%ld_%ld_%ld_%d%s",
path_prefix.c_str(),
_tablet_id,
_version.first,
_version.second,
_version_hash,
segment_id,
suffix.c_str());

return file_path;
}

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ class SegmentGroup {

std::string _construct_old_file_path(const std::string& path_prefix, int32_t segment_id, const std::string& suffix) const;

std::string _construct_err_sg_file_path(const std::string& path_prefix, int32_t segment_id, const std::string& suffix) const;

std::string _construct_err_sg_index_file_path(const std::string& path_prefix, int32_t segment_id) const;

std::string _construct_err_sg_data_file_path(const std::string& path_prefix, int32_t segment_id) const;

private:
int64_t _tablet_id;
int64_t _rowset_id;
Expand Down
24 changes: 24 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,30 @@ void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
for (auto& thread : threads) {
thread.join();
}

// check whether all data dir convert successfully
for (auto data_dir : data_dirs) {
if (!data_dir->convert_old_data_success()) {
// if any dir convert failed, exit the process
LOG(FATAL) << "dir = " << data_dir->path() << "convert failed";
}
}

std::vector<std::thread> clean_old_file_threads;
for (auto data_dir : data_dirs) {
clean_old_file_threads.emplace_back([data_dir] {
data_dir->set_convert_finished();
auto res = data_dir->remove_old_meta_and_files();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to clean old files dir = " << data_dir->path()
<< " res = " << res;
}
});
}

for (auto& thread : clean_old_file_threads) {
thread.join();
}
}

OLAPStatus StorageEngine::open() {
Expand Down