Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 68 additions & 68 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ static bool _cmp_tablet_by_create_time(const TabletSharedPtr& a, const TabletSha
}

TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
: _tablet_map_lock_shard_size(tablet_map_lock_shard_size), _last_update_stat_ms(0) {
DCHECK_GT(_tablet_map_lock_shard_size, 0);
DCHECK_EQ(_tablet_map_lock_shard_size & (tablet_map_lock_shard_size - 1), 0);
_tablet_map_lock_array = new RWMutex[_tablet_map_lock_shard_size];
_tablet_map_array = new tablet_map_t[_tablet_map_lock_shard_size];
}

TabletManager::~TabletManager() {
delete[] _tablet_map_lock_array;
delete[] _tablet_map_array;
: _tablets_shards_size(tablet_map_lock_shard_size),
_tablets_shards_mask(tablet_map_lock_shard_size - 1),
_last_update_stat_ms(0) {
CHECK_GT(_tablets_shards_size, 0);
CHECK_EQ(_tablets_shards_size & _tablets_shards_mask, 0);
_tablets_shards.resize(_tablets_shards_size);
for (auto& tablets_shard : _tablets_shards) {
tablets_shard.lock = std::unique_ptr<RWMutex>(new RWMutex());
}
}

OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash,
Expand Down Expand Up @@ -192,8 +191,7 @@ OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, Schem
}

bool TabletManager::check_tablet_id_exist(TTabletId tablet_id) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
ReadLock rlock(&tablet_map_lock);
ReadLock rlock(_get_tablets_shard_lock(tablet_id));
return _check_tablet_id_exist_unlocked(tablet_id);
}

Expand All @@ -212,8 +210,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash;

RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
WriteLock wrlock(&tablet_map_lock);
WriteLock wlock(_get_tablets_shard_lock(tablet_id));
// Make create_tablet operation to be idempotent:
// 1. Return true if tablet with same tablet_id and schema_hash exist;
// false if tablet with same tablet_id but different schema_hash exist.
Expand Down Expand Up @@ -439,8 +436,7 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(

OLAPStatus TabletManager::drop_tablet(TTabletId tablet_id, SchemaHash schema_hash,
bool keep_files) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
WriteLock wlock(&tablet_map_lock);
WriteLock wlock(_get_tablets_shard_lock(tablet_id));
return _drop_tablet_unlocked(tablet_id, schema_hash, keep_files);
}

Expand Down Expand Up @@ -540,12 +536,11 @@ OLAPStatus TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, SchemaHash
OLAPStatus TabletManager::drop_tablets_on_error_root_path(
const std::vector<TabletInfo>& tablet_info_vec) {
OLAPStatus res = OLAP_SUCCESS;
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
RWMutex& tablet_map_lock = _tablet_map_lock_array[i];
WriteLock wlock(&tablet_map_lock);
for (int32 i = 0; i < _tablets_shards_size; i++) {
WriteLock wlock(_tablets_shards[i].lock.get());
for (const TabletInfo& tablet_info : tablet_info_vec) {
TTabletId tablet_id = tablet_info.tablet_id;
if ((tablet_id & (_tablet_map_lock_shard_size - 1)) != i) {
if ((tablet_id & _tablets_shards_mask) != i) {
continue;
}
TSchemaHash schema_hash = tablet_info.schema_hash;
Expand Down Expand Up @@ -577,8 +572,7 @@ OLAPStatus TabletManager::drop_tablets_on_error_root_path(

TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
bool include_deleted, string* err) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
ReadLock rlock(&tablet_map_lock);
ReadLock rlock(_get_tablets_shard_lock(tablet_id));
return _get_tablet_unlocked(tablet_id, schema_hash, include_deleted, err);
}

Expand Down Expand Up @@ -618,8 +612,7 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaH

TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
TabletUid tablet_uid, bool include_deleted, string* err) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
ReadLock rlock(&tablet_map_lock);
ReadLock rlock(_get_tablets_shard_lock(tablet_id));
TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, schema_hash, include_deleted, err);
if (tablet != nullptr && tablet->tablet_uid() == tablet_uid) {
return tablet;
Expand Down Expand Up @@ -685,11 +678,10 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
uint32_t compaction_score = 0;
double tablet_scan_frequency = 0.0;
TabletSharedPtr best_tablet;
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
ReadLock tablet_map_rdlock(&_tablet_map_lock_array[i]);
tablet_map_t& tablet_map = _tablet_map_array[i];
for (tablet_map_t::value_type& table_ins : tablet_map) {
for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) {
for (const auto& tablets_shard : _tablets_shards) {
ReadLock rlock(tablets_shard.lock.get());
for (const auto& tablet_map : tablets_shard.tablet_map) {
for (const TabletSharedPtr& tablet_ptr : tablet_map.second.table_arr) {
std::vector<TTabletId>::iterator it_tablet =
find(tablet_submitted_compaction.begin(), tablet_submitted_compaction.end(),
tablet_ptr->tablet_id());
Expand Down Expand Up @@ -788,8 +780,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id,
TSchemaHash schema_hash, const string& meta_binary,
bool update_meta, bool force, bool restore) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
WriteLock wlock(&tablet_map_lock);
WriteLock wlock(_get_tablets_shard_lock(tablet_id));
TabletMetaSharedPtr tablet_meta(new TabletMeta());
OLAPStatus status = tablet_meta->deserialize(meta_binary);
if (status != OLAP_SUCCESS) {
Expand Down Expand Up @@ -904,8 +895,7 @@ OLAPStatus TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_

void TabletManager::release_schema_change_lock(TTabletId tablet_id) {
VLOG(3) << "release_schema_change_lock begin. tablet_id=" << tablet_id;
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
ReadLock rlock(&tablet_map_lock);
ReadLock rlock(_get_tablets_shard_lock(tablet_id));

tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map_t::iterator it = tablet_map.find(tablet_id);
Expand Down Expand Up @@ -949,11 +939,10 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>*

DorisMetrics::instance()->report_all_tablets_requests_total->increment(1);

for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
ReadLock rlock(&_tablet_map_lock_array[i]);
tablet_map_t& tablet_map = _tablet_map_array[i];
for (const auto& item : tablet_map) {
if (item.second.table_arr.size() == 0) {
for (const auto& tablets_shard : _tablets_shards) {
ReadLock rlock(tablets_shard.lock.get());
for (const auto& item : tablets_shard.tablet_map) {
if (item.second.table_arr.empty()) {
continue;
}

Expand Down Expand Up @@ -987,10 +976,10 @@ OLAPStatus TabletManager::start_trash_sweep() {
std::vector<int64_t> tablets_to_clean;
std::vector<TabletSharedPtr>
all_tablets; // we use this vector to save all tablet ptr for saving lock time.
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
tablet_map_t& tablet_map = _tablet_map_array[i];
for (auto& tablets_shard : _tablets_shards) {
tablet_map_t& tablet_map = tablets_shard.tablet_map;
{
ReadLock r_lock(&_tablet_map_lock_array[i]);
ReadLock rlock(tablets_shard.lock.get());
for (auto& item : tablet_map) {
// try to clean empty item
if (item.second.table_arr.empty()) {
Expand All @@ -1009,7 +998,7 @@ OLAPStatus TabletManager::start_trash_sweep() {
all_tablets.clear();

if (!tablets_to_clean.empty()) {
WriteLock w_lock(&_tablet_map_lock_array[i]);
WriteLock wlock(tablets_shard.lock.get());
// clean empty tablet id item
for (const auto& tablet_id_to_clean : tablets_to_clean) {
auto& item = tablet_map[tablet_id_to_clean];
Expand Down Expand Up @@ -1109,24 +1098,24 @@ OLAPStatus TabletManager::start_trash_sweep() {
} // start_trash_sweep

void TabletManager::register_clone_tablet(int64_t tablet_id) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
WriteLock wlock(&tablet_map_lock);
_tablets_under_clone.insert(tablet_id);
tablets_shard& shard = _get_tablets_shard(tablet_id);
WriteLock wlock(shard.lock.get());
shard.tablets_under_clone.insert(tablet_id);
}

void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
WriteLock wlock(&tablet_map_lock);
_tablets_under_clone.erase(tablet_id);
tablets_shard& shard = _get_tablets_shard(tablet_id);
WriteLock wlock(shard.lock.get());
shard.tablets_under_clone.erase(tablet_id);
}

void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash,
const string& schema_hash_path) {
// acquire the read lock, so that there is no creating tablet or load tablet from meta tasks
// create tablet and load tablet task should check whether the dir exists
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
ReadLock rlock(&tablet_map_lock);
tablets_shard& shard = _get_tablets_shard(tablet_id);
ReadLock rlock(shard.lock.get());

// check if meta already exists
TabletMetaSharedPtr tablet_meta(new TabletMeta());
Expand All @@ -1137,7 +1126,7 @@ void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId t
return;
}

if (_tablets_under_clone.count(tablet_id) > 0) {
if (shard.tablets_under_clone.count(tablet_id) > 0) {
LOG(INFO) << "tablet is under clone, skip delete the path " << schema_hash_path;
return;
}
Expand All @@ -1158,8 +1147,7 @@ void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId t
bool TabletManager::try_schema_change_lock(TTabletId tablet_id) {
bool res = false;
VLOG(3) << "try_schema_change_lock begin. tablet_id=" << tablet_id;
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
ReadLock rlock(&tablet_map_lock);
ReadLock rlock(_get_tablets_shard_lock(tablet_id));
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map_t::iterator it = tablet_map.find(tablet_id);
if (it == tablet_map.end()) {
Expand All @@ -1175,9 +1163,9 @@ void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_ma
size_t* tablet_count) {
DCHECK(tablet_count != 0);
*tablet_count = 0;
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
ReadLock rlock(&_tablet_map_lock_array[i]);
for (auto& entry : _tablet_map_array[i]) {
for (const auto& tablets_shard : _tablets_shards) {
ReadLock rlock(tablets_shard.lock.get());
for (const auto& entry : tablets_shard.tablet_map) {
const TableInstances& instance = entry.second;
for (auto& tablet : instance.table_arr) {
++(*tablet_count);
Expand Down Expand Up @@ -1205,10 +1193,10 @@ void TabletManager::get_partition_related_tablets(int64_t partition_id,
void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
std::vector<TabletSharedPtr> related_tablets;
{
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
ReadLock tablet_map_rdlock(&_tablet_map_lock_array[i]);
for (tablet_map_t::value_type& table_ins : _tablet_map_array[i]) {
for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) {
for (const auto& tablets_shard : _tablets_shards) {
ReadLock rlock(tablets_shard.lock.get());
for (const auto& tablet_map : tablets_shard.tablet_map) {
for (const TabletSharedPtr& tablet_ptr : tablet_map.second.table_arr) {
if (tablet_ptr->tablet_state() != TABLET_RUNNING) {
continue;
}
Expand All @@ -1230,10 +1218,10 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {

void TabletManager::_build_tablet_stat() {
_tablet_stat_cache.clear();
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
ReadLock rdlock(&_tablet_map_lock_array[i]);
for (const auto& item : _tablet_map_array[i]) {
if (item.second.table_arr.size() == 0) {
for (const auto& tablets_shard : _tablets_shards) {
ReadLock rlock(tablets_shard.lock.get());
for (const auto& item : tablets_shard.tablet_map) {
if (item.second.table_arr.empty()) {
continue;
}

Expand Down Expand Up @@ -1454,11 +1442,11 @@ void TabletManager::_remove_tablet_from_partition(const Tablet& tablet) {
}
}

void TabletManager::obtain_specific_quantity_tablets(vector<TabletInfo>& tablets_info,
void TabletManager::obtain_specific_quantity_tablets(vector<TabletInfo> &tablets_info,
int64_t num) {
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
ReadLock rdlock(&_tablet_map_lock_array[i]);
for (const auto& item : _tablet_map_array[i]) {
for (const auto& tablets_shard : _tablets_shards) {
ReadLock rlock(tablets_shard.lock.get());
for (const auto& item : tablets_shard.tablet_map) {
for (TabletSharedPtr tablet : item.second.table_arr) {
if (tablets_info.size() >= num) {
return;
Expand All @@ -1475,4 +1463,16 @@ void TabletManager::obtain_specific_quantity_tablets(vector<TabletInfo>& tablets
}
}

RWMutex* TabletManager::_get_tablets_shard_lock(TTabletId tabletId) {
return _get_tablets_shard(tabletId).lock.get();
}

TabletManager::tablet_map_t& TabletManager::_get_tablet_map(TTabletId tabletId) {
return _get_tablets_shard(tabletId).tablet_map;
}

TabletManager::tablets_shard& TabletManager::_get_tablets_shard(TTabletId tabletId) {
return _tablets_shards[tabletId & _tablets_shards_mask];
}

} // end namespace doris
32 changes: 15 additions & 17 deletions be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DataDir;
class TabletManager {
public:
TabletManager(int32_t tablet_map_lock_shard_size);
~TabletManager();
~TabletManager() = default;

bool check_tablet_id_exist(TTabletId tablet_id);

Expand Down Expand Up @@ -182,7 +182,7 @@ class TabletManager {

void _remove_tablet_from_partition(const Tablet& tablet);

inline RWMutex& _get_tablet_map_lock(TTabletId tabletId);
RWMutex* _get_tablets_shard_lock(TTabletId tabletId);

private:
DISALLOW_COPY_AND_ASSIGN(TabletManager);
Expand All @@ -199,10 +199,17 @@ class TabletManager {
// tablet_id -> TabletInstances
typedef std::unordered_map<int64_t, TableInstances> tablet_map_t;

const int32_t _tablet_map_lock_shard_size;
// _tablet_map_lock_array[i] protect _tablet_map_array[i], i=0,1,2...,and i < _tablet_map_lock_shard_size
RWMutex* _tablet_map_lock_array;
tablet_map_t* _tablet_map_array;
struct tablets_shard {
// protect tablet_map, tablets_under_clone and tablets_under_restore
std::unique_ptr<RWMutex> lock;
tablet_map_t tablet_map;
std::set<int64_t> tablets_under_clone;
std::set<int64_t> tablets_under_restore;
};

const int32_t _tablets_shards_size;
const int32_t _tablets_shards_mask;
std::vector<tablets_shard> _tablets_shards;

// Protect _partition_tablet_map, should not be obtained before _tablet_map_lock to avoid dead lock
RWMutex _partition_tablet_map_lock;
Expand All @@ -219,20 +226,11 @@ class TabletManager {
// last update time of tablet stat cache
int64_t _last_update_stat_ms;

inline tablet_map_t& _get_tablet_map(TTabletId tablet_id);
tablet_map_t& _get_tablet_map(TTabletId tablet_id);

std::set<int64_t> _tablets_under_clone;
std::set<int64_t> _tablets_under_restore;
tablets_shard& _get_tablets_shard(TTabletId tabletId);
};

inline RWMutex& TabletManager::_get_tablet_map_lock(TTabletId tabletId) {
return _tablet_map_lock_array[tabletId & (_tablet_map_lock_shard_size - 1)];
}

inline TabletManager::tablet_map_t& TabletManager::_get_tablet_map(TTabletId tabletId) {
return _tablet_map_array[tabletId & (_tablet_map_lock_shard_size - 1)];
}

} // namespace doris

#endif // DORIS_BE_SRC_OLAP_TABLET_MANAGER_H