Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ OLAPStatus DataDir::remove_old_meta_and_files() {
std::string pending_delta_path = data_path_prefix + PENDING_DELTA_PREFIX;
if (check_dir_existed(pending_delta_path)) {
LOG(INFO) << "remove pending delta path:" << pending_delta_path;
if(remove_dir(pending_delta_path) != OLAP_SUCCESS) {
if(remove_all_dir(pending_delta_path) != OLAP_SUCCESS) {
LOG(INFO) << "errors while remove pending delta path. tablet_path=" << data_path_prefix;
return true;
}
Expand All @@ -667,7 +667,7 @@ OLAPStatus DataDir::remove_old_meta_and_files() {
std::string incremental_delta_path = data_path_prefix + INCREMENTAL_DELTA_PREFIX;
if (check_dir_existed(incremental_delta_path)) {
LOG(INFO) << "remove incremental delta path:" << incremental_delta_path;
if(remove_dir(incremental_delta_path) != OLAP_SUCCESS) {
if(remove_all_dir(incremental_delta_path) != OLAP_SUCCESS) {
LOG(INFO) << "errors while remove incremental delta path. tablet_path=" << data_path_prefix;
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ OLAPStatus SegmentGroup::remove_old_files(std::vector<std::string>* links_to_rem
std::string pending_delta_path = _rowset_path_prefix + PENDING_DELTA_PREFIX;
if (check_dir_existed(pending_delta_path)) {
LOG(INFO) << "remove pending delta path:" << pending_delta_path;
RETURN_NOT_OK(remove_dir(pending_delta_path));
RETURN_NOT_OK(remove_all_dir(pending_delta_path));
}
return OLAP_SUCCESS;
}
Expand Down
180 changes: 98 additions & 82 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,12 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir(
boost::filesystem::path schema_hash_path(schema_hash_dir);
boost::filesystem::path tablet_path = schema_hash_path.parent_path();
std::string tablet_dir = tablet_path.string();
if (!check_dir_existed(schema_hash_dir)) {
// because the tablet is removed async, so that the dir may still exist
// when be receive create tablet again. For example redo schema change
if (check_dir_existed(schema_hash_dir)) {
LOG(WARNING) << "skip this dir because tablet path exist, path="<< schema_hash_dir;
continue;
} else {
data_dir->add_pending_ids(TABLET_ID_PREFIX + std::to_string(request.tablet_id));
res = create_dirs(schema_hash_dir);
if (res != OLAP_SUCCESS) {
Expand Down Expand Up @@ -549,9 +554,8 @@ OLAPStatus TabletManager::_drop_tablet_unlock(
<< ", tablet=" << related_tablet->full_name();
}
}

res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files);
related_tablet->release_header_lock();
res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to drop tablet which in schema change. tablet="
<< dropped_tablet->full_name();
Expand Down Expand Up @@ -717,8 +721,6 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
}

if (now - table_ptr->last_compaction_failure_time() <= config::min_compaction_failure_interval_sec * 1000) {
LOG(INFO) << "tablet last compaction failure time is: " << table_ptr->last_compaction_failure_time()
<< ", tablet: " << table_ptr->tablet_id() << ", skip it.";
continue;
}

Expand Down Expand Up @@ -965,97 +967,111 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>*
} // report_all_tablets_info

OLAPStatus TabletManager::start_trash_sweep() {
ReadLock rlock(&_tablet_map_lock);
std::vector<int64_t> tablets_to_clean;
for (auto& item : _tablet_map) {
// try to clean empty item
if (item.second.table_arr.empty()) {
// try to get schema change lock if could get schema change lock, then nobody
// own the lock could remove the item
// it will core if schema change thread may hold the lock and this thread will deconstruct lock
if (item.second.schema_change_lock.trylock() == OLAP_SUCCESS) {
item.second.schema_change_lock.unlock();
tablets_to_clean.push_back(item.first);
{
ReadLock rlock(&_tablet_map_lock);
std::vector<int64_t> tablets_to_clean;
for (auto& item : _tablet_map) {
// try to clean empty item
if (item.second.table_arr.empty()) {
// try to get schema change lock if could get schema change lock, then nobody
// own the lock could remove the item
// it will core if schema change thread may hold the lock and this thread will deconstruct lock
if (item.second.schema_change_lock.trylock() == OLAP_SUCCESS) {
item.second.schema_change_lock.unlock();
tablets_to_clean.push_back(item.first);
}
}
}
for (TabletSharedPtr tablet : item.second.table_arr) {
if (tablet == nullptr) {
continue;
for (TabletSharedPtr tablet : item.second.table_arr) {
if (tablet == nullptr) {
continue;
}
tablet->delete_expired_inc_rowsets();
}
tablet->delete_expired_inc_rowsets();
}
}
// clean empty tablet id item
for (const auto& tablet_id_to_clean : tablets_to_clean) {
if (_tablet_map[tablet_id_to_clean].table_arr.empty()) {
_tablet_map.erase(tablet_id_to_clean);
// clean empty tablet id item
for (const auto& tablet_id_to_clean : tablets_to_clean) {
if (_tablet_map[tablet_id_to_clean].table_arr.empty()) {
_tablet_map.erase(tablet_id_to_clean);
}
}
}

auto it = _shutdown_tablets.begin();
for (; it != _shutdown_tablets.end();) {
// check if the meta has the tablet info and its state is shutdown
if (it->use_count() > 1) {
// it means current tablet is referenced in other thread
++it;
continue;
}
TabletMetaSharedPtr new_tablet_meta(new(nothrow) TabletMeta());
if (new_tablet_meta == nullptr) {
LOG(WARNING) << "fail to malloc TabletMeta.";
++it;
continue;
}
OLAPStatus check_st = TabletMetaManager::get_header((*it)->data_dir(),
(*it)->tablet_id(), (*it)->schema_hash(), new_tablet_meta);
if (check_st == OLAP_SUCCESS) {
if (new_tablet_meta->tablet_state() != TABLET_SHUTDOWN
|| new_tablet_meta->tablet_uid() != (*it)->tablet_uid()) {
LOG(WARNING) << "tablet's state changed to normal, skip remove dirs"
<< " tablet id = " << new_tablet_meta->tablet_id()
<< " schema hash = " << new_tablet_meta->schema_hash()
<< " old tablet_uid=" << (*it)->tablet_uid()
<< " cur tablet_uid=" << new_tablet_meta->tablet_uid();
// remove it from list
it = _shutdown_tablets.erase(it);
int32_t clean_num = 0;
do {
sleep(1);
clean_num = 0;
ReadLock rlock(&_tablet_map_lock);
auto it = _shutdown_tablets.begin();
for (; it != _shutdown_tablets.end();) {
// check if the meta has the tablet info and its state is shutdown
if (it->use_count() > 1) {
// it means current tablet is referenced in other thread
++it;
continue;
}
if (check_dir_existed((*it)->tablet_path())) {
// take snapshot of tablet meta
std::string meta_file = (*it)->tablet_path() + "/" + std::to_string((*it)->tablet_id()) + ".hdr";
(*it)->tablet_meta()->save(meta_file);
LOG(INFO) << "start to move path to trash"
<< " tablet path = " << (*it)->tablet_path();
OLAPStatus rm_st = move_to_trash((*it)->tablet_path(), (*it)->tablet_path());
if (rm_st != OLAP_SUCCESS) {
LOG(WARNING) << "failed to move dir to trash"
<< " dir = " << (*it)->tablet_path();
++it;
TabletMetaSharedPtr new_tablet_meta(new(nothrow) TabletMeta());
if (new_tablet_meta == nullptr) {
LOG(WARNING) << "fail to malloc TabletMeta.";
++it;
continue;
}
OLAPStatus check_st = TabletMetaManager::get_header((*it)->data_dir(),
(*it)->tablet_id(), (*it)->schema_hash(), new_tablet_meta);
if (check_st == OLAP_SUCCESS) {
if (new_tablet_meta->tablet_state() != TABLET_SHUTDOWN
|| new_tablet_meta->tablet_uid() != (*it)->tablet_uid()) {
LOG(WARNING) << "tablet's state changed to normal, skip remove dirs"
<< " tablet id = " << new_tablet_meta->tablet_id()
<< " schema hash = " << new_tablet_meta->schema_hash()
<< " old tablet_uid=" << (*it)->tablet_uid()
<< " cur tablet_uid=" << new_tablet_meta->tablet_uid();
// remove it from list
it = _shutdown_tablets.erase(it);
continue;
}
}
TabletMetaManager::remove((*it)->data_dir(), (*it)->tablet_id(), (*it)->schema_hash());
LOG(INFO) << "successfully move tablet to trash."
<< " tablet id " << (*it)->tablet_id()
<< " schema hash " << (*it)->schema_hash()
<< " tablet path " << (*it)->tablet_path();
it = _shutdown_tablets.erase(it);
} else {
// if could not find tablet info in meta store, then check if dir existed
if (check_dir_existed((*it)->tablet_path())) {
LOG(WARNING) << "errors while load meta from store, skip this tablet"
if (check_dir_existed((*it)->tablet_path())) {
// take snapshot of tablet meta
std::string meta_file = (*it)->tablet_path() + "/" + std::to_string((*it)->tablet_id()) + ".hdr";
(*it)->tablet_meta()->save(meta_file);
LOG(INFO) << "start to move path to trash"
<< " tablet path = " << (*it)->tablet_path();
OLAPStatus rm_st = move_to_trash((*it)->tablet_path(), (*it)->tablet_path());
if (rm_st != OLAP_SUCCESS) {
LOG(WARNING) << "failed to move dir to trash"
<< " dir = " << (*it)->tablet_path();
++it;
continue;
}
}
TabletMetaManager::remove((*it)->data_dir(), (*it)->tablet_id(), (*it)->schema_hash());
LOG(INFO) << "successfully move tablet to trash."
<< " tablet id " << (*it)->tablet_id()
<< " schema hash " << (*it)->schema_hash();
++it;
} else {
LOG(INFO) << "could not find tablet dir, skip move to trash, remove it from gc queue."
<< " tablet id " << (*it)->tablet_id()
<< " schema hash " << (*it)->schema_hash()
<< " tablet path " << (*it)->tablet_path();
<< " schema hash " << (*it)->schema_hash()
<< " tablet path " << (*it)->tablet_path();
it = _shutdown_tablets.erase(it);
++ clean_num;
} else {
// if could not find tablet info in meta store, then check if dir existed
if (check_dir_existed((*it)->tablet_path())) {
LOG(WARNING) << "errors while load meta from store, skip this tablet"
<< " tablet id " << (*it)->tablet_id()
<< " schema hash " << (*it)->schema_hash();
++it;
} else {
LOG(INFO) << "could not find tablet dir, skip move to trash, remove it from gc queue."
<< " tablet id " << (*it)->tablet_id()
<< " schema hash " << (*it)->schema_hash()
<< " tablet path " << (*it)->tablet_path();
it = _shutdown_tablets.erase(it);
}
}

// if clean 100 tablets, should yield
if (clean_num >= 200) {
break;
}
}
}
} while (clean_num >= 200);
return OLAP_SUCCESS;
} // start_trash_sweep

Expand Down