diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f1f61fdb660d4a..733eb7a001f195 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1847,9 +1847,10 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() { LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id; continue; } - tablet->update_cooldown_conf(cooldown_conf.cooldown_term, - cooldown_conf.cooldown_replica_id); - // TODO(AlexYue): if `update_cooldown_conf` success, async call `write_cooldown_meta` + if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term, + cooldown_conf.cooldown_replica_id)) { + Tablet::async_write_cooldown_meta(tablet); + } } } } diff --git a/be/src/olap/cold_data_compaction.cpp b/be/src/olap/cold_data_compaction.cpp index 9fbeef824503b4..2a27d92acb930f 100644 --- a/be/src/olap/cold_data_compaction.cpp +++ b/be/src/olap/cold_data_compaction.cpp @@ -80,8 +80,7 @@ Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) { _tablet->save_meta(); } // write remote tablet meta - // TODO(AlexYue): async call `write_cooldown_meta` - RETURN_IF_ERROR(_tablet->write_cooldown_meta()); + Tablet::async_write_cooldown_meta(_tablet); return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index d20fa26e77e201..b1f25b67ab629f 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -95,6 +95,43 @@ bvar::Adder exceed_version_limit_counter; bvar::Window> exceed_version_limit_counter_minute( &exceed_version_limit_counter, 60); +struct WriteCooldownMetaExecutors { + WriteCooldownMetaExecutors(size_t executor_nums = 5) : _executor_nums(executor_nums) { + for (size_t i = 0; i < _executor_nums; i++) { + std::unique_ptr pool; + ThreadPoolBuilder("AsyncWriteCooldownMetaExecutor") + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(std::numeric_limits::max()) + .build(&pool); + _executors.emplace_back(std::move(pool)); + } + } + + static WriteCooldownMetaExecutors* GetInstance() { + static WriteCooldownMetaExecutors instance; + return &instance; + } + + void submit(int64_t tablet_id, std::function task); + size_t _get_executor_pos(int64_t tablet_id) const { return tablet_id % _executor_nums; }; + std::vector> _executors; + std::unordered_set _pengding_tablets; + std::mutex _latch; + size_t _executor_nums; +}; + +void WriteCooldownMetaExecutors::submit(int64_t tablet_id, std::function task) { + { + std::unique_lock lck {_latch}; + if (_pengding_tablets.count(tablet_id) > 0) { + return; + } + _pengding_tablets.insert(tablet_id); + } + _executors[_get_executor_pos(tablet_id)]->submit_func([task = std::move(task)]() { task(); }); +} + TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) { return std::make_shared(tablet_meta, data_dir); @@ -1769,8 +1806,7 @@ Status Tablet::_cooldown_data() { save_meta(); } // upload cooldowned rowset meta to remote fs - // TODO(AlexYue): async call `write_cooldown_meta` - RETURN_IF_ERROR(write_cooldown_meta()); + async_write_cooldown_meta(StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id())); return Status::OK(); } @@ -1810,11 +1846,32 @@ Status check_version_continuity(const std::vector& rs_metas return Status::OK(); } -Status Tablet::write_cooldown_meta() { +// It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes +// one tablet would at most have one async task to be done +void Tablet::async_write_cooldown_meta(TabletSharedPtr tablet) { + auto tablet_id = tablet->tablet_id(); + auto async_write_task = [t = std::move(tablet)]() { + auto ex = WriteCooldownMetaExecutors::GetInstance(); + { + std::unique_lock lck {ex->_latch}; + ex->_pengding_tablets.erase(t->tablet_id()); + } + auto s = t->_write_cooldown_meta(); + if (s.ok()) { + return; + } + LOG_WARNING("write tablet {} cooldown meta failed because: {}", t->tablet_id(), + s.to_string()); + if (!s.is()) { + ex->submit(t->tablet_id(), [t]() { Tablet::async_write_cooldown_meta(t); }); + } + }; + WriteCooldownMetaExecutors::GetInstance()->submit(tablet_id, std::move(async_write_task)); +} + +// hold SHARED `cooldown_conf_lock` +Status Tablet::_write_cooldown_meta() { auto [cooldown_replica_id, cooldown_term] = cooldown_conf(); - if (cooldown_replica_id != replica_id()) { - return Status::Aborted("this replica is not cooldown replica"); - } std::shared_ptr fs; RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs)); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 5931faab828789..e22bebf8900bc8 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -364,7 +364,7 @@ class Tablet : public BaseTablet { std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; } - Status write_cooldown_meta(); + static void async_write_cooldown_meta(TabletSharedPtr tablet); //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// @@ -476,6 +476,7 @@ class Tablet : public BaseTablet { Status _follow_cooldowned_data(); Status _read_cooldown_meta(const std::shared_ptr& fs, TabletMetaPB* tablet_meta_pb); + Status _write_cooldown_meta(); //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index b842f82d597a4a..63f256129b3c0c 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -366,6 +366,7 @@ TEST_F(TabletCooldownTest, normal) { ASSERT_EQ(Status::OK(), st); st = tablet1->cooldown(); // rowset [2-2] ASSERT_EQ(Status::OK(), st); + sleep(30); auto rs = tablet1->get_rowset_by_version({2, 2}); ASSERT_FALSE(rs->is_local()); @@ -391,6 +392,7 @@ TEST_F(TabletCooldownTest, normal) { tablet2->update_cooldown_conf(2, kReplicaId); st = tablet2->cooldown(); // rowset [0-1] ASSERT_EQ(Status::OK(), st); + sleep(30); auto rs2 = tablet2->get_rowset_by_version({2, 2}); ASSERT_FALSE(rs2->is_local());