From 52e6afe773470fbe8d0d4247736fe0cb05f054fb Mon Sep 17 00:00:00 2001 From: ByteYue Date: Mon, 13 Feb 2023 11:55:11 +0800 Subject: [PATCH 1/2] use async write cooldown meta --- be/src/agent/task_worker_pool.cpp | 7 ++-- be/src/olap/cold_data_compaction.cpp | 3 +- be/src/olap/tablet.cpp | 56 +++++++++++++++++++++++++--- be/src/olap/tablet.h | 31 +++++++++++++++ 4 files changed, 86 insertions(+), 11 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f1f61fdb660d4a..9a29b5f50fc73a 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1847,9 +1847,10 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() { LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id; continue; } - tablet->update_cooldown_conf(cooldown_conf.cooldown_term, - cooldown_conf.cooldown_replica_id); - // TODO(AlexYue): if `update_cooldown_conf` success, async call `write_cooldown_meta` + if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term, + cooldown_conf.cooldown_replica_id)) { + tablet->async_write_cooldown_meta(); + } } } } diff --git a/be/src/olap/cold_data_compaction.cpp b/be/src/olap/cold_data_compaction.cpp index 9fbeef824503b4..cc4b4a6e07f182 100644 --- a/be/src/olap/cold_data_compaction.cpp +++ b/be/src/olap/cold_data_compaction.cpp @@ -80,8 +80,7 @@ Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) { _tablet->save_meta(); } // write remote tablet meta - // TODO(AlexYue): async call `write_cooldown_meta` - RETURN_IF_ERROR(_tablet->write_cooldown_meta()); + _tablet->async_write_cooldown_meta(); return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index d20fa26e77e201..68c81d744d05d6 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -95,6 +95,31 @@ bvar::Adder exceed_version_limit_counter; bvar::Window> exceed_version_limit_counter_minute( &exceed_version_limit_counter, 60); +void WriteCooldownMetaExecutors::submit(int64_t tablet_id, std::function task) { + { + std::unique_lock lck {_latch}; + if (_pengding_tablets.count(tablet_id) > 0) { + return; + } + _pengding_tablets.insert(tablet_id); + } + _executors[_get_executor_pos(tablet_id)]->submit_func( + [this, tablet_id, t = std::move(task)] { _do_task(tablet_id, t); }); +} + +void WriteCooldownMetaExecutors::_do_task(int64_t tablet_id, std::function task) { + auto s = task(); + if (!s.ok()) { + // we need to make sure the cooldown meta task is done in the end + LOG_INFO("write cooldown meta failed because: {}", s); + _executors[_get_executor_pos(tablet_id)]->submit_func( + [this, tablet_id, t = std::move(task)] { _do_task(tablet_id, t); }); + return; + } + std::unique_lock lck {_latch}; + _pengding_tablets.erase(tablet_id); +} + TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) { return std::make_shared(tablet_meta, data_dir); @@ -1769,8 +1794,7 @@ Status Tablet::_cooldown_data() { save_meta(); } // upload cooldowned rowset meta to remote fs - // TODO(AlexYue): async call `write_cooldown_meta` - RETURN_IF_ERROR(write_cooldown_meta()); + async_write_cooldown_meta(); return Status::OK(); } @@ -1810,11 +1834,31 @@ Status check_version_continuity(const std::vector& rs_metas return Status::OK(); } -Status Tablet::write_cooldown_meta() { +// It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes +// one tablet would at most have one async task to be done +void Tablet::async_write_cooldown_meta() { + WriteCooldownMetaExecutors::GetInstance()->submit(tablet_id(), [this]() { + std::shared_lock cooldown_conf_rlock(_cooldown_conf_lock); + if (_cooldown_replica_id <= 0) { // wait for FE to push cooldown conf + LOG_INFO("tablet {} cancel write cooldown meta because invalid cooldown_replica_id", + tablet_id()); + return Status::OK(); + } + auto [cooldown_replica_id, cooldown_term] = cooldown_conf(); + if (cooldown_replica_id != replica_id()) { + LOG_INFO( + "tablet {} cancel write cooldown meta because this replica is not cooldown " + "replica", + tablet_id()); + return Status::OK(); + } + return _write_cooldown_meta(); + }); +} + +// hold SHARED `cooldown_conf_lock` +Status Tablet::_write_cooldown_meta() { auto [cooldown_replica_id, cooldown_term] = cooldown_conf(); - if (cooldown_replica_id != replica_id()) { - return Status::Aborted("this replica is not cooldown replica"); - } std::shared_ptr fs; RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs)); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 5931faab828789..301c9db9253eee 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -64,6 +64,35 @@ using TabletSharedPtr = std::shared_ptr; enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_REMOTE_AND_LOCAL }; +class WriteCooldownMetaExecutors { +public: + WriteCooldownMetaExecutors(size_t executor_nums = 5) : _executor_nums(executor_nums) { + for (size_t i = 0; i < _executor_nums; i++) { + std::unique_ptr pool; + ThreadPoolBuilder("AsyncWriteCooldownMetaExecutor") + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1024) + .build(&pool); + _executors.emplace_back(std::move(pool)); + } + } + + static WriteCooldownMetaExecutors* GetInstance() { + static WriteCooldownMetaExecutors instance; + return &instance; + } + + void submit(int64_t tablet_id, std::function task); + +private: + void _do_task(int64_t tablet_id, std::function task); + size_t _get_executor_pos(int64_t tablet_id) const { return tablet_id % _executor_nums; }; + std::vector> _executors; + std::unordered_set _pengding_tablets; + std::mutex _latch; + size_t _executor_nums; +}; class Tablet : public BaseTablet { public: static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, @@ -365,6 +394,7 @@ class Tablet : public BaseTablet { std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; } Status write_cooldown_meta(); + void async_write_cooldown_meta(); //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// @@ -476,6 +506,7 @@ class Tablet : public BaseTablet { Status _follow_cooldowned_data(); Status _read_cooldown_meta(const std::shared_ptr& fs, TabletMetaPB* tablet_meta_pb); + Status _write_cooldown_meta(); //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// From f3dbb378eaddbe603e913c2c65f477a203c0e215 Mon Sep 17 00:00:00 2001 From: ByteYue Date: Tue, 7 Mar 2023 11:24:43 +0800 Subject: [PATCH 2/2] use static async write cooldown meta --- be/src/agent/task_worker_pool.cpp | 2 +- be/src/olap/cold_data_compaction.cpp | 2 +- be/src/olap/tablet.cpp | 79 ++++++++++++++++----------- be/src/olap/tablet.h | 32 +---------- be/test/olap/tablet_cooldown_test.cpp | 2 + 5 files changed, 51 insertions(+), 66 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 9a29b5f50fc73a..733eb7a001f195 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1849,7 +1849,7 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() { } if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term, cooldown_conf.cooldown_replica_id)) { - tablet->async_write_cooldown_meta(); + Tablet::async_write_cooldown_meta(tablet); } } } diff --git a/be/src/olap/cold_data_compaction.cpp b/be/src/olap/cold_data_compaction.cpp index cc4b4a6e07f182..2a27d92acb930f 100644 --- a/be/src/olap/cold_data_compaction.cpp +++ b/be/src/olap/cold_data_compaction.cpp @@ -80,7 +80,7 @@ Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) { _tablet->save_meta(); } // write remote tablet meta - _tablet->async_write_cooldown_meta(); + Tablet::async_write_cooldown_meta(_tablet); return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 68c81d744d05d6..b1f25b67ab629f 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -95,7 +95,33 @@ bvar::Adder exceed_version_limit_counter; bvar::Window> exceed_version_limit_counter_minute( &exceed_version_limit_counter, 60); -void WriteCooldownMetaExecutors::submit(int64_t tablet_id, std::function task) { +struct WriteCooldownMetaExecutors { + WriteCooldownMetaExecutors(size_t executor_nums = 5) : _executor_nums(executor_nums) { + for (size_t i = 0; i < _executor_nums; i++) { + std::unique_ptr pool; + ThreadPoolBuilder("AsyncWriteCooldownMetaExecutor") + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(std::numeric_limits::max()) + .build(&pool); + _executors.emplace_back(std::move(pool)); + } + } + + static WriteCooldownMetaExecutors* GetInstance() { + static WriteCooldownMetaExecutors instance; + return &instance; + } + + void submit(int64_t tablet_id, std::function task); + size_t _get_executor_pos(int64_t tablet_id) const { return tablet_id % _executor_nums; }; + std::vector> _executors; + std::unordered_set _pengding_tablets; + std::mutex _latch; + size_t _executor_nums; +}; + +void WriteCooldownMetaExecutors::submit(int64_t tablet_id, std::function task) { { std::unique_lock lck {_latch}; if (_pengding_tablets.count(tablet_id) > 0) { @@ -103,21 +129,7 @@ void WriteCooldownMetaExecutors::submit(int64_t tablet_id, std::functionsubmit_func( - [this, tablet_id, t = std::move(task)] { _do_task(tablet_id, t); }); -} - -void WriteCooldownMetaExecutors::_do_task(int64_t tablet_id, std::function task) { - auto s = task(); - if (!s.ok()) { - // we need to make sure the cooldown meta task is done in the end - LOG_INFO("write cooldown meta failed because: {}", s); - _executors[_get_executor_pos(tablet_id)]->submit_func( - [this, tablet_id, t = std::move(task)] { _do_task(tablet_id, t); }); - return; - } - std::unique_lock lck {_latch}; - _pengding_tablets.erase(tablet_id); + _executors[_get_executor_pos(tablet_id)]->submit_func([task = std::move(task)]() { task(); }); } TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, @@ -1794,7 +1806,7 @@ Status Tablet::_cooldown_data() { save_meta(); } // upload cooldowned rowset meta to remote fs - async_write_cooldown_meta(); + async_write_cooldown_meta(StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id())); return Status::OK(); } @@ -1836,24 +1848,25 @@ Status check_version_continuity(const std::vector& rs_metas // It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes // one tablet would at most have one async task to be done -void Tablet::async_write_cooldown_meta() { - WriteCooldownMetaExecutors::GetInstance()->submit(tablet_id(), [this]() { - std::shared_lock cooldown_conf_rlock(_cooldown_conf_lock); - if (_cooldown_replica_id <= 0) { // wait for FE to push cooldown conf - LOG_INFO("tablet {} cancel write cooldown meta because invalid cooldown_replica_id", - tablet_id()); - return Status::OK(); +void Tablet::async_write_cooldown_meta(TabletSharedPtr tablet) { + auto tablet_id = tablet->tablet_id(); + auto async_write_task = [t = std::move(tablet)]() { + auto ex = WriteCooldownMetaExecutors::GetInstance(); + { + std::unique_lock lck {ex->_latch}; + ex->_pengding_tablets.erase(t->tablet_id()); } - auto [cooldown_replica_id, cooldown_term] = cooldown_conf(); - if (cooldown_replica_id != replica_id()) { - LOG_INFO( - "tablet {} cancel write cooldown meta because this replica is not cooldown " - "replica", - tablet_id()); - return Status::OK(); + auto s = t->_write_cooldown_meta(); + if (s.ok()) { + return; } - return _write_cooldown_meta(); - }); + LOG_WARNING("write tablet {} cooldown meta failed because: {}", t->tablet_id(), + s.to_string()); + if (!s.is()) { + ex->submit(t->tablet_id(), [t]() { Tablet::async_write_cooldown_meta(t); }); + } + }; + WriteCooldownMetaExecutors::GetInstance()->submit(tablet_id, std::move(async_write_task)); } // hold SHARED `cooldown_conf_lock` diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 301c9db9253eee..e22bebf8900bc8 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -64,35 +64,6 @@ using TabletSharedPtr = std::shared_ptr; enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_REMOTE_AND_LOCAL }; -class WriteCooldownMetaExecutors { -public: - WriteCooldownMetaExecutors(size_t executor_nums = 5) : _executor_nums(executor_nums) { - for (size_t i = 0; i < _executor_nums; i++) { - std::unique_ptr pool; - ThreadPoolBuilder("AsyncWriteCooldownMetaExecutor") - .set_min_threads(1) - .set_max_threads(1) - .set_max_queue_size(1024) - .build(&pool); - _executors.emplace_back(std::move(pool)); - } - } - - static WriteCooldownMetaExecutors* GetInstance() { - static WriteCooldownMetaExecutors instance; - return &instance; - } - - void submit(int64_t tablet_id, std::function task); - -private: - void _do_task(int64_t tablet_id, std::function task); - size_t _get_executor_pos(int64_t tablet_id) const { return tablet_id % _executor_nums; }; - std::vector> _executors; - std::unordered_set _pengding_tablets; - std::mutex _latch; - size_t _executor_nums; -}; class Tablet : public BaseTablet { public: static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, @@ -393,8 +364,7 @@ class Tablet : public BaseTablet { std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; } - Status write_cooldown_meta(); - void async_write_cooldown_meta(); + static void async_write_cooldown_meta(TabletSharedPtr tablet); //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index b842f82d597a4a..63f256129b3c0c 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -366,6 +366,7 @@ TEST_F(TabletCooldownTest, normal) { ASSERT_EQ(Status::OK(), st); st = tablet1->cooldown(); // rowset [2-2] ASSERT_EQ(Status::OK(), st); + sleep(30); auto rs = tablet1->get_rowset_by_version({2, 2}); ASSERT_FALSE(rs->is_local()); @@ -391,6 +392,7 @@ TEST_F(TabletCooldownTest, normal) { tablet2->update_cooldown_conf(2, kReplicaId); st = tablet2->cooldown(); // rowset [0-1] ASSERT_EQ(Status::OK(), st); + sleep(30); auto rs2 = tablet2->get_rowset_by_version({2, 2}); ASSERT_FALSE(rs2->is_local());