diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index b38f8a584fbd08..9397a426bc5727 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -131,13 +131,13 @@ Status bthread_fork_join(const std::vector>& tasks, int return status; } -Status bthread_fork_join(const std::vector>& tasks, int concurrency, +Status bthread_fork_join(std::vector>&& tasks, int concurrency, std::future* fut) { // std::function will cause `copy`, we need to use heap memory to avoid copy ctor called auto prom = std::make_shared>(); *fut = prom->get_future(); - std::function* fn = - new std::function([&tasks, concurrency, p = std::move(prom)]() mutable { + std::function* fn = new std::function( + [tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable { p->set_value(bthread_fork_join(tasks, concurrency)); }); diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index deed11bb2837ba..2fdd435245beac 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -58,7 +58,7 @@ Status bthread_fork_join(const std::vector>& tasks, int // An async wrap of `bthread_fork_join` declared previously using promise-future // return OK if fut successfully created, otherwise return error -Status bthread_fork_join(const std::vector>& tasks, int concurrency, +Status bthread_fork_join(std::vector>&& tasks, int concurrency, std::future* fut); class CloudMetaMgr { diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 3299d22d38a647..597358b89134a1 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -463,7 +463,7 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { _cloud_tablet_dependency = Dependency::create_shared( _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); _tablets.resize(_scan_ranges.size()); - _tasks.reserve(_scan_ranges.size()); + std::vector> tasks; _sync_statistics.resize(_scan_ranges.size()); for (size_t i = 0; i < _scan_ranges.size(); i++) { auto* sync_stats = &_sync_statistics[i]; @@ -472,7 +472,7 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), version); auto task_ctx = state->get_task_execution_context(); - _tasks.emplace_back([this, sync_stats, version, i, task_ctx]() { + tasks.emplace_back([this, sync_stats, version, i, task_ctx]() { auto task_lock = task_ctx.lock(); if (task_lock == nullptr) { return Status::OK(); @@ -497,8 +497,9 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { return Status::OK(); }); } - RETURN_IF_ERROR(cloud::bthread_fork_join( - _tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); + RETURN_IF_ERROR(cloud::bthread_fork_join(std::move(tasks), + config::init_scanner_sync_rowsets_parallelism, + &_cloud_tablet_future)); } _sync_tablet = true; } diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 60b9d9b485fbb8..9d07006d07d902 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -98,7 +98,6 @@ class OlapScanLocalState final : public ScanLocalState { std::vector> _scan_ranges; std::vector _sync_statistics; - std::vector> _tasks; MonotonicStopWatch _sync_cloud_tablets_watcher; std::shared_ptr _cloud_tablet_dependency; std::atomic _pending_tablets_num = 0; diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp b/be/test/cloud/cloud_meta_mgr_test.cpp index 43611c6e4e4f82..b938d949553881 100644 --- a/be/test/cloud/cloud_meta_mgr_test.cpp +++ b/be/test/cloud/cloud_meta_mgr_test.cpp @@ -52,7 +52,8 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) { { std::future fut; auto start = steady_clock::now(); - EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return immediately + auto t = tasks; + EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return immediately auto end = steady_clock::now(); auto elapsed = duration_cast(end - start).count(); EXPECT_LE(elapsed, 40); // async @@ -74,7 +75,8 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) { { std::future fut; auto start = steady_clock::now(); - EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return immediately + auto t = tasks; + EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return immediately auto end = steady_clock::now(); auto elapsed = duration_cast(end - start).count(); EXPECT_LE(elapsed, 40); // async