Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1847,9 +1847,10 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id;
continue;
}
tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
cooldown_conf.cooldown_replica_id);
// TODO(AlexYue): if `update_cooldown_conf` success, async call `write_cooldown_meta`
if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
cooldown_conf.cooldown_replica_id)) {
Tablet::async_write_cooldown_meta(tablet);
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/cold_data_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) {
_tablet->save_meta();
}
// write remote tablet meta
// TODO(AlexYue): async call `write_cooldown_meta`
RETURN_IF_ERROR(_tablet->write_cooldown_meta());
Tablet::async_write_cooldown_meta(_tablet);
return Status::OK();
}

Expand Down
69 changes: 63 additions & 6 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,43 @@ bvar::Adder<uint64_t> exceed_version_limit_counter;
bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute(
&exceed_version_limit_counter, 60);

struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors(size_t executor_nums = 5) : _executor_nums(executor_nums) {
for (size_t i = 0; i < _executor_nums; i++) {
std::unique_ptr<ThreadPool> pool;
ThreadPoolBuilder("AsyncWriteCooldownMetaExecutor")
.set_min_threads(1)
.set_max_threads(1)
.set_max_queue_size(std::numeric_limits<int>::max())
.build(&pool);
_executors.emplace_back(std::move(pool));
}
}

static WriteCooldownMetaExecutors* GetInstance() {
static WriteCooldownMetaExecutors instance;
return &instance;
}

void submit(int64_t tablet_id, std::function<void()> task);
size_t _get_executor_pos(int64_t tablet_id) const { return tablet_id % _executor_nums; };
std::vector<std::unique_ptr<ThreadPool>> _executors;
std::unordered_set<int64_t> _pengding_tablets;
std::mutex _latch;
size_t _executor_nums;
};

void WriteCooldownMetaExecutors::submit(int64_t tablet_id, std::function<void()> task) {
{
std::unique_lock<std::mutex> lck {_latch};
if (_pengding_tablets.count(tablet_id) > 0) {
return;
}
_pengding_tablets.insert(tablet_id);
}
_executors[_get_executor_pos(tablet_id)]->submit_func([task = std::move(task)]() { task(); });
}

TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
DataDir* data_dir) {
return std::make_shared<Tablet>(tablet_meta, data_dir);
Expand Down Expand Up @@ -1769,8 +1806,7 @@ Status Tablet::_cooldown_data() {
save_meta();
}
// upload cooldowned rowset meta to remote fs
// TODO(AlexYue): async call `write_cooldown_meta`
RETURN_IF_ERROR(write_cooldown_meta());
async_write_cooldown_meta(StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id()));
return Status::OK();
}

Expand Down Expand Up @@ -1810,11 +1846,32 @@ Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>& rs_metas
return Status::OK();
}

Status Tablet::write_cooldown_meta() {
// It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes
// one tablet would at most have one async task to be done
void Tablet::async_write_cooldown_meta(TabletSharedPtr tablet) {
auto tablet_id = tablet->tablet_id();
auto async_write_task = [t = std::move(tablet)]() {
auto ex = WriteCooldownMetaExecutors::GetInstance();
{
std::unique_lock<std::mutex> lck {ex->_latch};
ex->_pengding_tablets.erase(t->tablet_id());
}
auto s = t->_write_cooldown_meta();
if (s.ok()) {
return;
}
LOG_WARNING("write tablet {} cooldown meta failed because: {}", t->tablet_id(),
s.to_string());
if (!s.is<ABORTED>()) {
ex->submit(t->tablet_id(), [t]() { Tablet::async_write_cooldown_meta(t); });
}
};
WriteCooldownMetaExecutors::GetInstance()->submit(tablet_id, std::move(async_write_task));
}

// hold SHARED `cooldown_conf_lock`
Status Tablet::_write_cooldown_meta() {
auto [cooldown_replica_id, cooldown_term] = cooldown_conf();
if (cooldown_replica_id != replica_id()) {
return Status::Aborted("this replica is not cooldown replica");
}

std::shared_ptr<io::RemoteFileSystem> fs;
RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class Tablet : public BaseTablet {

std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; }

Status write_cooldown_meta();
static void async_write_cooldown_meta(TabletSharedPtr tablet);
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -476,6 +476,7 @@ class Tablet : public BaseTablet {
Status _follow_cooldowned_data();
Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
TabletMetaPB* tablet_meta_pb);
Status _write_cooldown_meta();
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions be/test/olap/tablet_cooldown_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ TEST_F(TabletCooldownTest, normal) {
ASSERT_EQ(Status::OK(), st);
st = tablet1->cooldown(); // rowset [2-2]
ASSERT_EQ(Status::OK(), st);
sleep(30);
auto rs = tablet1->get_rowset_by_version({2, 2});
ASSERT_FALSE(rs->is_local());

Expand All @@ -391,6 +392,7 @@ TEST_F(TabletCooldownTest, normal) {
tablet2->update_cooldown_conf(2, kReplicaId);
st = tablet2->cooldown(); // rowset [0-1]
ASSERT_EQ(Status::OK(), st);
sleep(30);
auto rs2 = tablet2->get_rowset_by_version({2, 2});
ASSERT_FALSE(rs2->is_local());

Expand Down