Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int
return status;
}

Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency,
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
std::future<Status>* fut) {
// std::function will cause `copy`, we need to use heap memory to avoid copy ctor called
auto prom = std::make_shared<std::promise<Status>>();
*fut = prom->get_future();
std::function<void()>* fn =
new std::function<void()>([&tasks, concurrency, p = std::move(prom)]() mutable {
std::function<void()>* fn = new std::function<void()>(
[tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable {
p->set_value(bthread_fork_join(tasks, concurrency));
});

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int

// An async wrap of `bthread_fork_join` declared previously using promise-future
// return OK if fut successfully created, otherwise return error
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency,
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
std::future<Status>* fut);

class CloudMetaMgr {
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
_cloud_tablet_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP");
_tablets.resize(_scan_ranges.size());
_tasks.reserve(_scan_ranges.size());
std::vector<std::function<Status()>> tasks;
_sync_statistics.resize(_scan_ranges.size());
for (size_t i = 0; i < _scan_ranges.size(); i++) {
auto* sync_stats = &_sync_statistics[i];
Expand All @@ -472,7 +472,7 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
_scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(),
version);
auto task_ctx = state->get_task_execution_context();
_tasks.emplace_back([this, sync_stats, version, i, task_ctx]() {
tasks.emplace_back([this, sync_stats, version, i, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
return Status::OK();
Expand All @@ -497,8 +497,9 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
return Status::OK();
});
}
RETURN_IF_ERROR(cloud::bthread_fork_join(
_tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future));
RETURN_IF_ERROR(cloud::bthread_fork_join(std::move(tasks),
config::init_scanner_sync_rowsets_parallelism,
&_cloud_tablet_future));
}
_sync_tablet = true;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {

std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
std::vector<SyncRowsetStats> _sync_statistics;
std::vector<std::function<Status()>> _tasks;
MonotonicStopWatch _sync_cloud_tablets_watcher;
std::shared_ptr<Dependency> _cloud_tablet_dependency;
std::atomic<size_t> _pending_tablets_num = 0;
Expand Down
6 changes: 4 additions & 2 deletions be/test/cloud/cloud_meta_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
{
std::future<Status> fut;
auto start = steady_clock::now();
EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return immediately
auto t = tasks;
EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return immediately
auto end = steady_clock::now();
auto elapsed = duration_cast<milliseconds>(end - start).count();
EXPECT_LE(elapsed, 40); // async
Expand All @@ -74,7 +75,8 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
{
std::future<Status> fut;
auto start = steady_clock::now();
EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return immediately
auto t = tasks;
EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return immediately
auto end = steady_clock::now();
auto elapsed = duration_cast<milliseconds>(end - start).count();
EXPECT_LE(elapsed, 40); // async
Expand Down
Loading