Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 51 additions & 45 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -614,31 +614,23 @@ OLAPStatus DataDir::_convert_old_tablet() {

OLAPStatus DataDir::remove_old_meta_and_files() {
// clean old meta(olap header message)
auto clean_old_meta_func = [this](int64_t tablet_id,
int32_t schema_hash, const std::string& value) -> bool {
TabletMetaManager::remove(this, tablet_id, schema_hash, OLD_HEADER_PREFIX);
LOG(INFO) << "successfully clean old tablet meta(olap header) for tablet="
<< tablet_id << "." << schema_hash
<< "from data dir: " << _path;
return true;
};
OLAPStatus clean_old_meta_status = TabletMetaManager::traverse_headers(_meta,
clean_old_meta_func, OLD_HEADER_PREFIX);
if (clean_old_meta_status != OLAP_SUCCESS) {
// If failed to clean meta just skip the error, there will be useless metas in rocksdb column family
LOG(WARNING) << "there is failure when clean old tablet meta(olap header) from data dir:" << _path;
} else {
LOG(INFO) << "successfully clean old tablet meta(olap header) from data dir: " << _path;
}

// clean old files because they have hard links in new file name format
auto clean_old_files_func = [this](int64_t tablet_id,
auto clean_old_meta_files_func = [this](int64_t tablet_id,
int32_t schema_hash, const std::string& value) -> bool {
// convert olap header and files
OLAPHeaderMessage olap_header_msg;
TabletMetaPB tablet_meta_pb;
bool parsed = tablet_meta_pb.ParseFromString(value);
vector<RowsetMetaPB> pending_rowsets;
bool parsed = olap_header_msg.ParseFromString(value);
if (!parsed) {
// if errors when load, just skip it
LOG(WARNING) << "failed to load tablet meta from meta store to tablet=" << tablet_id << "." << schema_hash;
LOG(FATAL) << "convert olap header to tablet meta failed when load olap header tablet="
<< tablet_id << "." << schema_hash;
return true;
}
OlapSnapshotConverter converter;
OLAPStatus status = converter.to_tablet_meta_pb(olap_header_msg, &tablet_meta_pb, &pending_rowsets);
if (status != OLAP_SUCCESS) {
LOG(FATAL) << "convert olap header to tablet meta failed when convert header and files tablet="
<< tablet_id << "." << schema_hash;
return true;
}

Expand All @@ -651,36 +643,50 @@ OLAPStatus DataDir::remove_old_meta_and_files() {
RowsetMetaSharedPtr alpha_rowset_meta(new AlphaRowsetMeta());
alpha_rowset_meta->init_from_pb(visible_rowset);
AlphaRowset rowset(&tablet_schema, data_path_prefix, this, alpha_rowset_meta);
rowset.init();
rowset.load();
if (rowset.init() != OLAP_SUCCESS) {
LOG(INFO) << "errors while init rowset. tablet_path=" << data_path_prefix;
return true;
}
std::vector<std::string> old_files;
rowset.remove_old_files(&old_files);
if (rowset.remove_old_files(&old_files) != OLAP_SUCCESS) {
LOG(INFO) << "errors while remove_old_files. tablet_path=" << data_path_prefix;
return true;
}
}

// convert inc delta file to rowsets and remove old files
for (auto& inc_rs_meta : tablet_meta_pb.inc_rs_metas()) {
RowsetMetaSharedPtr alpha_rowset_meta(new AlphaRowsetMeta());
alpha_rowset_meta->init_from_pb(inc_rs_meta);
AlphaRowset rowset(&tablet_schema, data_path_prefix, this, alpha_rowset_meta);
rowset.init();
rowset.load();
// check if the rowset is successfully converted
// incremental delta is saved in a seperate incremental folder
// create a mock rowset to delete its files
AlphaRowset inc_rowset(&tablet_schema, data_path_prefix + "/incremental_delta",
this, alpha_rowset_meta);
inc_rowset.init();
std::vector<std::string> old_files;
inc_rowset.remove_old_files(&old_files);
// remove incremental dir and pending dir
std::string pending_delta_path = data_path_prefix + PENDING_DELTA_PREFIX;
if (check_dir_existed(pending_delta_path)) {
LOG(INFO) << "remove pending delta path:" << pending_delta_path;
if(remove_dir(pending_delta_path) != OLAP_SUCCESS) {
LOG(INFO) << "errors while remove pending delta path. tablet_path=" << data_path_prefix;
return true;
}
}

std::string incremental_delta_path = data_path_prefix + INCREMENTAL_DELTA_PREFIX;
if (check_dir_existed(incremental_delta_path)) {
LOG(INFO) << "remove incremental delta path:" << incremental_delta_path;
if(remove_dir(incremental_delta_path) != OLAP_SUCCESS) {
LOG(INFO) << "errors while remove incremental delta path. tablet_path=" << data_path_prefix;
return true;
}
}

TabletMetaManager::remove(this, tablet_id, schema_hash, OLD_HEADER_PREFIX);
LOG(INFO) << "successfully clean old tablet meta(olap header) for tablet="
<< tablet_id << "." << schema_hash
<< " tablet_path=" << data_path_prefix;

return true;
};
OLAPStatus clean_old_tablet_status = TabletMetaManager::traverse_headers(_meta,
clean_old_files_func, HEADER_PREFIX);
if (clean_old_tablet_status != OLAP_SUCCESS) {
LOG(WARNING) << "there is failure when loading tablet and clean old files:" << _path;
OLAPStatus clean_old_meta_files_status = TabletMetaManager::traverse_headers(_meta,
clean_old_meta_files_func, OLD_HEADER_PREFIX);
if (clean_old_meta_files_status != OLAP_SUCCESS) {
// If failed to clean meta just skip the error, there will be useless metas in rocksdb column family
LOG(WARNING) << "there is failure when clean old tablet meta(olap header) from data dir:" << _path;
} else {
LOG(INFO) << "load rowset from meta finished, data dir: " << _path;
LOG(INFO) << "successfully clean old tablet meta(olap header) from data dir: " << _path;
}
return OLAP_SUCCESS;
}
Expand Down