From 585a6ba3631167515fd7e0d4ca0afff5e169ed45 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 7 Jul 2025 15:54:33 +0800 Subject: [PATCH 01/17] [Improvement](cloud) Hold tablets in another RPC thread --- be/src/common/config.cpp | 3 -- be/src/common/config.h | 3 -- .../exec/multi_cast_data_stream_source.h | 2 +- be/src/pipeline/exec/olap_scan_operator.cpp | 23 +++++++--- be/src/pipeline/exec/olap_scan_operator.h | 18 +++++++- be/src/pipeline/exec/operator.h | 4 +- be/src/pipeline/exec/scan_operator.h | 10 ++--- be/src/pipeline/pipeline_task.cpp | 44 ++++++++++--------- be/src/pipeline/pipeline_task.h | 4 +- be/test/pipeline/pipeline_task_test.cpp | 22 +++++----- 10 files changed, 78 insertions(+), 55 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d63570ff4dffa9..7f57fbe9459216 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1050,9 +1050,6 @@ DEFINE_mInt32(segcompaction_num_threads, "5"); // enable java udf and jdbc scannode DEFINE_Bool(enable_java_support, "true"); -// enable prefetch tablets before opening -DEFINE_mBool(enable_prefetch_tablet, "true"); - // Set config randomly to check more issues in github workflow DEFINE_Bool(enable_fuzzy_mode, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a867bfccfada96..65a4bcba0d1ee6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1093,9 +1093,6 @@ DECLARE_mInt32(segcompaction_num_threads); // enable java udf and jdbc scannode DECLARE_Bool(enable_java_support); -// enable prefetch tablets before opening -DECLARE_mBool(enable_prefetch_tablet); - // Set config randomly to check more issues in github workflow DECLARE_Bool(enable_fuzzy_mode); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 16823757fdf869..2de1c09ffe8c6a 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -50,7 +50,7 @@ class MultiCastDataStreamSourceLocalState final Status close(RuntimeState* state) override; friend class MultiCastDataStreamerSourceOperatorX; - std::vector filter_dependencies() override { + std::vector execution_dependencies() override { if (_filter_dependencies.empty()) { return {}; } diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index f9c994c56faf51..d293e3b6e7c645 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -356,7 +356,6 @@ Status OlapScanLocalState::_init_scanners(std::list* sc bool has_cpu_limit = state()->query_options().__isset.resource_limit && state()->query_options().resource_limit.__isset.cpu_limit; - RETURN_IF_ERROR(hold_tablets()); if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit && p._push_down_agg_type == TPushAggOp::NONE && (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) { @@ -465,7 +464,7 @@ Status OlapScanLocalState::hold_tablets() { std::vector> tasks {}; tasks.reserve(_scan_ranges.size()); int64_t duration_ns {0}; - { + if (!_sync_tablet) { SCOPED_RAW_TIMER(&duration_ns); for (size_t i = 0; i < _scan_ranges.size(); i++) { auto* sync_stats = &sync_statistics[i]; @@ -474,6 +473,7 @@ Status OlapScanLocalState::hold_tablets() { _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), version); tasks.emplace_back([this, sync_stats, version, i]() { + Defer defer([&] { this->_cloud_tablet_dependencies[i]->set_ready(); }); auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); _tablets[i] = {std::move(tablet), version}; @@ -488,9 +488,15 @@ Status OlapScanLocalState::hold_tablets() { return Status::OK(); }); } - RETURN_IF_ERROR( - cloud::bthread_fork_join(tasks, config::init_scanner_sync_rowsets_parallelism)); + RETURN_IF_ERROR(cloud::bthread_fork_join( + tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); + _sync_tablet = true; + return Status::OK(); } + DCHECK(std::all_of( + _cloud_tablet_dependencies.begin(), _cloud_tablet_dependencies.end(), + [&](DependencySPtr dep) -> bool { return !dep->is_blocked_by(nullptr); })); + DCHECK(_cloud_tablet_future.valid() && _cloud_tablet_future.get().ok()); COUNTER_UPDATE(_sync_rowset_timer, duration_ns); auto total_rowsets = std::accumulate( _tablets.cbegin(), _tablets.cend(), 0LL, @@ -550,7 +556,6 @@ Status OlapScanLocalState::hold_tablets() { _sync_rowset_get_remote_delete_bitmap_rpc_timer->value(), TUnit::TIME_NS)); } - } else { for (size_t i = 0; i < _scan_ranges.size(); i++) { int64_t version = 0; @@ -614,6 +619,14 @@ void OlapScanLocalState::set_scan_ranges(RuntimeState* state, COUNTER_UPDATE(_tablet_counter, 1); } } + if (config::is_cloud_mode()) { + _cloud_tablet_dependencies.resize(_scan_ranges.size()); + for (size_t i = 0; i < _scan_ranges.size(); i++) { + _cloud_tablet_dependencies[i] = + Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "CLOUD_TABLET_DEP_" + std::to_string(i)); + } + } } static std::string olap_filter_to_string(const doris::TCondition& condition) { diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index f77d914579d3d6..386ca11de1f975 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -40,9 +40,9 @@ class OlapScanOperatorX; class OlapScanLocalState final : public ScanLocalState { public: using Parent = OlapScanOperatorX; + using Base = ScanLocalState; ENABLE_FACTORY_CREATOR(OlapScanLocalState); - OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) - : ScanLocalState(state, parent) {} + OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {} TOlapScanNode& olap_scan_node() const; @@ -52,6 +52,17 @@ class OlapScanLocalState final : public ScanLocalState { std::to_string(_parent->nereids_id()), olap_scan_node().table_name); } Status hold_tablets(); + std::vector execution_dependencies() override { + if (_cloud_tablet_dependencies.empty()) { + return Base::execution_dependencies(); + } + std::vector res(_cloud_tablet_dependencies.size()); + std::transform(_cloud_tablet_dependencies.begin(), _cloud_tablet_dependencies.end(), + res.begin(), [](DependencySPtr dep) { return dep.get(); }); + std::vector tmp = Base::execution_dependencies(); + std::copy(tmp.begin(), tmp.end(), std::inserter(res, res.end())); + return res; + } private: friend class vectorized::OlapScanner; @@ -90,6 +101,9 @@ class OlapScanLocalState final : public ScanLocalState { Status _build_key_ranges_and_filters(); std::vector> _scan_ranges; + std::vector> _cloud_tablet_dependencies; + std::future _cloud_tablet_future; + std::atomic_bool _sync_tablet = false; std::vector> _cond_ranges; OlapScanKeys _scan_keys; std::vector> _olap_filters; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 491bdf9a06e93e..0deaecbc5f27fc 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -201,7 +201,7 @@ class PipelineXLocalStateBase { virtual Dependency* finishdependency() { return nullptr; } virtual Dependency* spill_dependency() const { return nullptr; } // override in Scan MultiCastSink - virtual std::vector filter_dependencies() { return {}; } + virtual std::vector execution_dependencies() { return {}; } Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts, vectorized::Block* block, size_t column_to_keep); @@ -1116,7 +1116,7 @@ class DummyOperatorLocalState final : public PipelineXLocalState dependencies() const override { return {_tmp_dependency.get()}; } - std::vector filter_dependencies() override { return {_filter_dependency.get()}; } + std::vector execution_dependencies() override { return {_filter_dependency.get()}; } Dependency* spill_dependency() const override { return _spill_dependency.get(); } private: diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index a053fa66909a28..b1d692d288fdf2 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -156,15 +156,13 @@ class ScanLocalState : public ScanLocalStateBase { int64_t get_push_down_count() override; - std::vector filter_dependencies() override { + std::vector execution_dependencies() override { if (_filter_dependencies.empty()) { return {}; } - std::vector res; - res.resize(_filter_dependencies.size()); - for (size_t i = 0; i < _filter_dependencies.size(); i++) { - res[i] = _filter_dependencies[i].get(); - } + std::vector res(_filter_dependencies.size()); + std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(), + [](DependencySPtr dep) { return dep.get(); }); return res; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 1629f3c504468e..8784521e40a105 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -78,9 +78,9 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState _sink(pipeline->sink_shared_pointer()), _shared_state_map(std::move(shared_state_map)), _task_idx(task_idx), - _execution_dep(state->get_query_ctx()->get_execution_dependency()), _memory_sufficient_dependency(state->get_query_ctx()->get_memory_sufficient_dependency()), _pipeline_name(_pipeline->name()) { + _execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency()); if (!_shared_state_map.contains(_sink->dests_id().front())) { auto shared_state = _sink->create_shared_state(); if (shared_state) { @@ -119,13 +119,11 @@ Status PipelineTask::prepare(const std::vector& scan_range, co parent_profile = _state->get_local_state(op->operator_id())->profile(); } { - std::vector filter_dependencies; - const auto& deps = _state->get_local_state(_source->operator_id())->filter_dependencies(); - std::copy(deps.begin(), deps.end(), - std::inserter(filter_dependencies, filter_dependencies.end())); - + const auto& deps = + _state->get_local_state(_source->operator_id())->execution_dependencies(); std::unique_lock lc(_dependency_lock); - filter_dependencies.swap(_filter_dependencies); + std::copy(deps.begin(), deps.end(), + std::inserter(_execution_dependencies, _execution_dependencies.end())); } if (auto fragment = _fragment_context.lock()) { if (fragment->get_query_ctx()->is_cancelled()) { @@ -263,10 +261,10 @@ bool PipelineTask::_wait_to_start() { // Before task starting, we should make sure // 1. Execution dependency is ready (which is controlled by FE 2-phase commit) // 2. Runtime filter dependencies are ready - return _execution_dep->is_blocked_by(shared_from_this()) || - std::any_of( - _filter_dependencies.begin(), _filter_dependencies.end(), - [&](Dependency* dep) -> bool { return dep->is_blocked_by(shared_from_this()); }); + // 3. All tablets are loaded into local storage + return std::any_of( + _execution_dependencies.begin(), _execution_dependencies.end(), + [&](Dependency* dep) -> bool { return dep->is_blocked_by(shared_from_this()); }); } bool PipelineTask::_is_pending_finish() { @@ -320,8 +318,6 @@ void PipelineTask::terminate() { DCHECK(_wake_up_early || fragment->is_canceled()); std::for_each(_spill_dependencies.begin(), _spill_dependencies.end(), [&](Dependency* dep) { dep->set_always_ready(); }); - std::for_each(_filter_dependencies.begin(), _filter_dependencies.end(), - [&](Dependency* dep) { dep->set_always_ready(); }); std::for_each(_write_dependencies.begin(), _write_dependencies.end(), [&](Dependency* dep) { dep->set_always_ready(); }); std::for_each(_finish_dependencies.begin(), _finish_dependencies.end(), @@ -331,7 +327,9 @@ void PipelineTask::terminate() { std::for_each(deps.begin(), deps.end(), [&](Dependency* dep) { dep->set_always_ready(); }); }); - _execution_dep->set_ready(); + // All `_execution_deps` will never be set blocking from ready. So we just set ready here. + std::for_each(_execution_dependencies.begin(), _execution_dependencies.end(), + [&](Dependency* dep) { dep->set_ready(); }); _memory_sufficient_dependency->set_ready(); } } @@ -404,13 +402,19 @@ Status PipelineTask::execute(bool* done) { Status status = Status::Error("fault_inject pipeline_task execute failed"); return status; }); + // `hold_tablets` is designed as a reentrant method. For cloud mode, we reach here first to + // request remote tablets by RPC, and then hold local tablets by the second call. + if (!_hold_cloud_tablet && !_wake_up_early) { + RETURN_IF_ERROR(_source->hold_tablets(_state)); + } // `_wake_up_early` must be after `_wait_to_start()` if (_wait_to_start() || _wake_up_early) { - if (config::enable_prefetch_tablet) { - RETURN_IF_ERROR(_source->hold_tablets(_state)); - } return Status::OK(); } + if (!_hold_cloud_tablet && !_wake_up_early) { + RETURN_IF_ERROR(_source->hold_tablets(_state)); + } + _hold_cloud_tablet = true; // The status must be runnable if (!_opened && !fragment_context->is_canceled()) { @@ -740,10 +744,10 @@ std::string PipelineTask::debug_string() { _write_dependencies[j]->debug_string(i + 1)); } - fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency Information: \n"); - for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) { + fmt::format_to(debug_string_buffer, "\nExecution Dependency Information: \n"); + for (size_t j = 0; j < _execution_dependencies.size(); j++, i++) { fmt::format_to(debug_string_buffer, "{}. {}\n", i, - _filter_dependencies[j]->debug_string(i + 1)); + _execution_dependencies[j]->debug_string(i + 1)); } fmt::format_to(debug_string_buffer, "\nSpill Dependency Information: \n"); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 9c65b24d8cd32f..e15e18b4aa9104 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -238,7 +238,8 @@ class PipelineTask : public std::enable_shared_from_this { std::vector _spill_dependencies; std::vector _write_dependencies; std::vector _finish_dependencies; - std::vector _filter_dependencies; + std::vector _execution_dependencies; + std::atomic_bool _hold_cloud_tablet = false; // All shared states of this pipeline task. std::map> _op_shared_states; @@ -253,7 +254,6 @@ class PipelineTask : public std::enable_shared_from_this { unsigned long long _exec_time_slice = config::pipeline_task_exec_time_slice * NANOS_PER_MILLIS; Dependency* _blocked_dep = nullptr; - Dependency* _execution_dep = nullptr; Dependency* _memory_sufficient_dependency; std::mutex _dependency_lock; diff --git a/be/test/pipeline/pipeline_task_test.cpp b/be/test/pipeline/pipeline_task_test.cpp index 94ebf2a111af02..f5a08fd56e6606 100644 --- a/be/test/pipeline/pipeline_task_test.cpp +++ b/be/test/pipeline/pipeline_task_test.cpp @@ -318,7 +318,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id()) .value() ->dependencies() @@ -340,7 +340,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { { // task is blocked by filter dependency. _query_ctx->get_execution_dependency()->set_ready(); - task->_filter_dependencies.front()->block(); + task->_execution_dependencies.back()->block(); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); bool done = false; EXPECT_TRUE(task->execute(&done).ok()); @@ -348,8 +348,8 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { EXPECT_FALSE(done); EXPECT_FALSE(task->_wake_up_early); EXPECT_FALSE(task->_opened); - EXPECT_FALSE(task->_filter_dependencies.front()->ready()); - EXPECT_FALSE(task->_filter_dependencies.front()->_blocked_task.empty()); + EXPECT_FALSE(task->_execution_dependencies.back()->ready()); + EXPECT_FALSE(task->_execution_dependencies.back()->_blocked_task.empty()); EXPECT_TRUE(task->_read_dependencies.empty()); EXPECT_TRUE(task->_write_dependencies.empty()); EXPECT_TRUE(task->_finish_dependencies.empty()); @@ -358,7 +358,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { } { // `open` phase. And then task is blocked by read dependency. - task->_filter_dependencies.front()->set_ready(); + task->_execution_dependencies.back()->set_ready(); read_dep->block(); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); bool done = false; @@ -446,7 +446,7 @@ TEST_F(PipelineTaskTest, TEST_TERMINATE) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); } _query_ctx->get_execution_dependency()->set_ready(); { @@ -511,7 +511,7 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); } for (int i = 0; i < task->LEGAL_STATE_TRANSITION.size(); i++) { auto target = (PipelineTask::State)i; @@ -556,7 +556,7 @@ TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); } _query_ctx->get_execution_dependency()->set_ready(); { @@ -637,7 +637,7 @@ TEST_F(PipelineTaskTest, TEST_SINK_EOF) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); } _query_ctx->get_execution_dependency()->set_ready(); { @@ -718,7 +718,7 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id()) .value() ->dependencies() @@ -854,7 +854,7 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id()) .value() ->dependencies() From 2021a3c92292e983707281c16b7a3828c743a752 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 7 Jul 2025 16:21:40 +0800 Subject: [PATCH 02/17] update --- be/src/pipeline/exec/operator.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 0deaecbc5f27fc..720f34eb8f59e5 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -1116,7 +1116,9 @@ class DummyOperatorLocalState final : public PipelineXLocalState dependencies() const override { return {_tmp_dependency.get()}; } - std::vector execution_dependencies() override { return {_filter_dependency.get()}; } + std::vector execution_dependencies() override { + return {_filter_dependency.get()}; + } Dependency* spill_dependency() const override { return _spill_dependency.get(); } private: From e936d8fd51eaeaa6d523611f9846c05e6b102836 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 8 Jul 2025 10:17:45 +0800 Subject: [PATCH 03/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 92 +++++++++++---------- be/src/pipeline/exec/olap_scan_operator.h | 13 +-- be/src/pipeline/exec/scan_operator.cpp | 1 + be/src/pipeline/exec/scan_operator.h | 2 + be/src/pipeline/pipeline_task.cpp | 5 -- 5 files changed, 57 insertions(+), 56 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index d293e3b6e7c645..9ffefaf29934c5 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -449,6 +449,49 @@ Status OlapScanLocalState::_init_scanners(std::list* sc return Status::OK(); } +Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { + if (config::is_cloud_mode() && !_sync_tablet) { + _cloud_tablet_dependencies.resize(_scan_ranges.size()); + for (size_t i = 0; i < _scan_ranges.size(); i++) { + _cloud_tablet_dependencies[i] = + Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "CLOUD_TABLET_DEP_" + std::to_string(i)); + } + + std::vector> tasks {}; + tasks.reserve(_scan_ranges.size()); + _sync_statistics.resize(_scan_ranges.size()); + SCOPED_RAW_TIMER(&_duration_ns); + for (size_t i = 0; i < _scan_ranges.size(); i++) { + auto* sync_stats = &_sync_statistics[i]; + int64_t version = 0; + std::from_chars(_scan_ranges[i]->version.data(), + _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), + version); + tasks.emplace_back([this, sync_stats, version, i]() { + Defer defer([&] { this->_cloud_tablet_dependencies[i]->set_ready(); }); + auto tablet = + DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); + _tablets[i] = {std::move(tablet), version}; + SyncOptions options; + options.query_version = version; + options.merge_schema = true; + RETURN_IF_ERROR(std::dynamic_pointer_cast(_tablets[i].tablet) + ->sync_rowsets(options, sync_stats)); + // FIXME(plat1ko): Avoid pointer cast + ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count( + *_tablets[i].tablet); + return Status::OK(); + }); + } + RETURN_IF_ERROR(cloud::bthread_fork_join( + tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); + _sync_tablet = true; + return Status::OK(); + } + return Status::OK(); +} + Status OlapScanLocalState::hold_tablets() { if (!_tablets.empty()) { return Status::OK(); @@ -460,51 +503,18 @@ Status OlapScanLocalState::hold_tablets() { _read_sources.resize(_scan_ranges.size()); if (config::is_cloud_mode()) { - std::vector sync_statistics(_scan_ranges.size()); - std::vector> tasks {}; - tasks.reserve(_scan_ranges.size()); - int64_t duration_ns {0}; - if (!_sync_tablet) { - SCOPED_RAW_TIMER(&duration_ns); - for (size_t i = 0; i < _scan_ranges.size(); i++) { - auto* sync_stats = &sync_statistics[i]; - int64_t version = 0; - std::from_chars(_scan_ranges[i]->version.data(), - _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), - version); - tasks.emplace_back([this, sync_stats, version, i]() { - Defer defer([&] { this->_cloud_tablet_dependencies[i]->set_ready(); }); - auto tablet = - DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); - _tablets[i] = {std::move(tablet), version}; - SyncOptions options; - options.query_version = version; - options.merge_schema = true; - RETURN_IF_ERROR(std::dynamic_pointer_cast(_tablets[i].tablet) - ->sync_rowsets(options, sync_stats)); - // FIXME(plat1ko): Avoid pointer cast - ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count( - *_tablets[i].tablet); - return Status::OK(); - }); - } - RETURN_IF_ERROR(cloud::bthread_fork_join( - tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); - _sync_tablet = true; - return Status::OK(); - } DCHECK(std::all_of( _cloud_tablet_dependencies.begin(), _cloud_tablet_dependencies.end(), [&](DependencySPtr dep) -> bool { return !dep->is_blocked_by(nullptr); })); DCHECK(_cloud_tablet_future.valid() && _cloud_tablet_future.get().ok()); - COUNTER_UPDATE(_sync_rowset_timer, duration_ns); + COUNTER_UPDATE(_sync_rowset_timer, _duration_ns); auto total_rowsets = std::accumulate( _tablets.cbegin(), _tablets.cend(), 0LL, [](long long acc, const auto& tabletWithVersion) { return acc + tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size(); }); COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets); - for (const auto& sync_stats : sync_statistics) { + for (const auto& sync_stats : _sync_statistics) { COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, sync_stats.tablet_meta_cache_hit); COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, sync_stats.tablet_meta_cache_miss); COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer, @@ -523,14 +533,14 @@ Status OlapScanLocalState::hold_tablets() { COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer, sync_stats.get_remote_delete_bitmap_rpc_ns); } - auto time_ms = duration_ns / 1000 / 1000; + auto time_ms = _duration_ns / 1000 / 1000; if (time_ms >= config::sync_rowsets_slow_threshold_ms) { DorisMetrics::instance()->get_remote_tablet_slow_time_ms->increment(time_ms); DorisMetrics::instance()->get_remote_tablet_slow_cnt->increment(1); LOG_WARNING("get tablet takes too long") .tag("query_id", print_id(PipelineXLocalState<>::_state->query_id())) .tag("node_id", _parent->node_id()) - .tag("total_time", PrettyPrinter::print(duration_ns, TUnit::TIME_NS)) + .tag("total_time", PrettyPrinter::print(_duration_ns, TUnit::TIME_NS)) .tag("num_tablets", _tablets.size()) .tag("tablet_meta_cache_hit", _sync_rowset_tablet_meta_cache_hit->value()) .tag("tablet_meta_cache_miss", _sync_rowset_tablet_meta_cache_miss->value()) @@ -619,14 +629,6 @@ void OlapScanLocalState::set_scan_ranges(RuntimeState* state, COUNTER_UPDATE(_tablet_counter, 1); } } - if (config::is_cloud_mode()) { - _cloud_tablet_dependencies.resize(_scan_ranges.size()); - for (size_t i = 0; i < _scan_ranges.size(); i++) { - _cloud_tablet_dependencies[i] = - Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "CLOUD_TABLET_DEP_" + std::to_string(i)); - } - } } static std::string olap_filter_to_string(const doris::TCondition& condition) { diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 386ca11de1f975..508c5c440c66f9 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -21,20 +21,18 @@ #include +#include "cloud/cloud_tablet.h" #include "common/status.h" #include "olap/tablet_reader.h" #include "operator.h" #include "pipeline/exec/scan_operator.h" -namespace doris { -#include "common/compile_check_begin.h" - -namespace vectorized { +namespace doris::vectorized { class OlapScanner; -} -} // namespace doris +} // namespace doris::vectorized namespace doris::pipeline { +#include "common/compile_check_begin.h" class OlapScanOperatorX; class OlapScanLocalState final : public ScanLocalState { @@ -63,6 +61,7 @@ class OlapScanLocalState final : public ScanLocalState { std::copy(tmp.begin(), tmp.end(), std::inserter(res, res.end())); return res; } + Status sync_cloud_tablets(RuntimeState* state) override; private: friend class vectorized::OlapScanner; @@ -101,6 +100,8 @@ class OlapScanLocalState final : public ScanLocalState { Status _build_key_ranges_and_filters(); std::vector> _scan_ranges; + std::vector _sync_statistics; + int64_t _duration_ns = 0; std::vector> _cloud_tablet_dependencies; std::future _cloud_tablet_future; std::atomic_bool _sync_tablet = false; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 44d87b73c1d62a..2a9357db532b73 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -81,6 +81,7 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) _filter_dependencies, p.get_name() + "_FILTER_DEPENDENCY")); RETURN_IF_ERROR(_init_profile()); set_scan_ranges(state, info.scan_ranges); + RETURN_IF_ERROR(sync_cloud_tablets(state)); _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter"); return Status::OK(); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index b1d692d288fdf2..90d51d4bf56b3b 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -80,6 +80,7 @@ class ScanLocalStateBase : public PipelineXLocalState<> { virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0; virtual void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) = 0; + virtual Status sync_cloud_tablets(RuntimeState* state) = 0; virtual TPushAggOp::type get_push_down_agg_type() = 0; @@ -151,6 +152,7 @@ class ScanLocalState : public ScanLocalStateBase { Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) override; void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) override {} + Status sync_cloud_tablets(RuntimeState* state) override { return Status::OK(); } TPushAggOp::type get_push_down_agg_type() override; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 8784521e40a105..ecad6a6758b04a 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -402,11 +402,6 @@ Status PipelineTask::execute(bool* done) { Status status = Status::Error("fault_inject pipeline_task execute failed"); return status; }); - // `hold_tablets` is designed as a reentrant method. For cloud mode, we reach here first to - // request remote tablets by RPC, and then hold local tablets by the second call. - if (!_hold_cloud_tablet && !_wake_up_early) { - RETURN_IF_ERROR(_source->hold_tablets(_state)); - } // `_wake_up_early` must be after `_wait_to_start()` if (_wait_to_start() || _wake_up_early) { return Status::OK(); From 088a6678ed1c3311734942b3a7371bf95786258c Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 8 Jul 2025 12:43:49 +0800 Subject: [PATCH 04/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 7 +++---- be/src/pipeline/exec/olap_scan_operator.h | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 9ffefaf29934c5..3841540452857a 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -458,8 +458,7 @@ Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { "CLOUD_TABLET_DEP_" + std::to_string(i)); } - std::vector> tasks {}; - tasks.reserve(_scan_ranges.size()); + _tasks.reserve(_scan_ranges.size()); _sync_statistics.resize(_scan_ranges.size()); SCOPED_RAW_TIMER(&_duration_ns); for (size_t i = 0; i < _scan_ranges.size(); i++) { @@ -468,7 +467,7 @@ Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { std::from_chars(_scan_ranges[i]->version.data(), _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), version); - tasks.emplace_back([this, sync_stats, version, i]() { + _tasks.emplace_back([this, sync_stats, version, i]() { Defer defer([&] { this->_cloud_tablet_dependencies[i]->set_ready(); }); auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); @@ -485,7 +484,7 @@ Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { }); } RETURN_IF_ERROR(cloud::bthread_fork_join( - tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); + _tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); _sync_tablet = true; return Status::OK(); } diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 508c5c440c66f9..7c4c2d5138d09a 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -101,6 +101,7 @@ class OlapScanLocalState final : public ScanLocalState { std::vector> _scan_ranges; std::vector _sync_statistics; + std::vector> _tasks; int64_t _duration_ns = 0; std::vector> _cloud_tablet_dependencies; std::future _cloud_tablet_future; From b5f1f090ae980496484f828f69f1efb94b37f565 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 8 Jul 2025 15:18:02 +0800 Subject: [PATCH 05/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 3841540452857a..a8af1a0c941130 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -458,6 +458,7 @@ Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { "CLOUD_TABLET_DEP_" + std::to_string(i)); } + _tablets.resize(_scan_ranges.size()); _tasks.reserve(_scan_ranges.size()); _sync_statistics.resize(_scan_ranges.size()); SCOPED_RAW_TIMER(&_duration_ns); @@ -498,7 +499,6 @@ Status OlapScanLocalState::hold_tablets() { MonotonicStopWatch timer; timer.start(); - _tablets.resize(_scan_ranges.size()); _read_sources.resize(_scan_ranges.size()); if (config::is_cloud_mode()) { From 4eebf9cf16903cbbd8f3f3003acb5144ad38e9f3 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 8 Jul 2025 22:28:10 +0800 Subject: [PATCH 06/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index a8af1a0c941130..278a57faaf7292 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -493,10 +493,6 @@ Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { } Status OlapScanLocalState::hold_tablets() { - if (!_tablets.empty()) { - return Status::OK(); - } - MonotonicStopWatch timer; timer.start(); _read_sources.resize(_scan_ranges.size()); From 206c79a0d3b33b5ba39b28ed1ac3418a866d07a6 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Jul 2025 10:05:08 +0800 Subject: [PATCH 07/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 278a57faaf7292..c99d6c5dc7ddc5 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -468,7 +468,12 @@ Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { std::from_chars(_scan_ranges[i]->version.data(), _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), version); - _tasks.emplace_back([this, sync_stats, version, i]() { + auto task_ctx = state->get_task_execution_context(); + _tasks.emplace_back([this, sync_stats, version, i, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + return Status::OK(); + } Defer defer([&] { this->_cloud_tablet_dependencies[i]->set_ready(); }); auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); From c9645b47164d8cde7fa6492b07b4b5eb0f5264b5 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Jul 2025 11:56:46 +0800 Subject: [PATCH 08/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index c99d6c5dc7ddc5..f2eb49bd791dd4 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -567,6 +567,7 @@ Status OlapScanLocalState::hold_tablets() { TUnit::TIME_NS)); } } else { + _tablets.resize(_scan_ranges.size()); for (size_t i = 0; i < _scan_ranges.size(); i++) { int64_t version = 0; std::from_chars(_scan_ranges[i]->version.data(), From 8f0af0a6c5d9245f0fac350d399181d732ad1a90 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Jul 2025 18:48:55 +0800 Subject: [PATCH 09/17] udpate --- be/src/pipeline/exec/olap_scan_operator.cpp | 42 ++++++++++++--------- be/src/pipeline/exec/olap_scan_operator.h | 20 +++++----- be/src/pipeline/exec/operator.h | 6 ++- be/src/pipeline/exec/scan_operator.cpp | 1 - be/src/pipeline/exec/scan_operator.h | 3 -- be/src/pipeline/pipeline_task.cpp | 17 +++++++-- be/src/pipeline/pipeline_task.h | 2 +- 7 files changed, 51 insertions(+), 40 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index f2eb49bd791dd4..dac407fbae1752 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -46,6 +46,12 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" +Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + RETURN_IF_ERROR(_sync_cloud_tablets(state)); + return Status::OK(); +} + Status OlapScanLocalState::_init_profile() { RETURN_IF_ERROR(ScanLocalState::_init_profile()); // Rows read from storage. @@ -449,14 +455,11 @@ Status OlapScanLocalState::_init_scanners(std::list* sc return Status::OK(); } -Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { +Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { if (config::is_cloud_mode() && !_sync_tablet) { - _cloud_tablet_dependencies.resize(_scan_ranges.size()); - for (size_t i = 0; i < _scan_ranges.size(); i++) { - _cloud_tablet_dependencies[i] = - Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "CLOUD_TABLET_DEP_" + std::to_string(i)); - } + _cloud_tablet_dependency = Dependency::create_shared( + _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); + _pending_tablets_num = cast_set(_scan_ranges.size()); _tablets.resize(_scan_ranges.size()); _tasks.reserve(_scan_ranges.size()); @@ -474,7 +477,11 @@ Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { if (task_lock == nullptr) { return Status::OK(); } - Defer defer([&] { this->_cloud_tablet_dependencies[i]->set_ready(); }); + Defer defer([&] { + if (_pending_tablets_num.fetch_sub(1) == 1) { + _cloud_tablet_dependency->set_ready(); + } + }); auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); _tablets[i] = {std::move(tablet), version}; @@ -492,20 +499,23 @@ Status OlapScanLocalState::sync_cloud_tablets(RuntimeState* state) { RETURN_IF_ERROR(cloud::bthread_fork_join( _tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); _sync_tablet = true; - return Status::OK(); } return Status::OK(); } -Status OlapScanLocalState::hold_tablets() { +Status OlapScanLocalState::prepare(RuntimeState* state) { + if (_prepared) { + return Status::OK(); + } MonotonicStopWatch timer; timer.start(); _read_sources.resize(_scan_ranges.size()); if (config::is_cloud_mode()) { - DCHECK(std::all_of( - _cloud_tablet_dependencies.begin(), _cloud_tablet_dependencies.end(), - [&](DependencySPtr dep) -> bool { return !dep->is_blocked_by(nullptr); })); + if (_cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) { + // Remote tablet still in-flight. + return Status::OK(); + } DCHECK(_cloud_tablet_future.valid() && _cloud_tablet_future.get().ok()); COUNTER_UPDATE(_sync_rowset_timer, _duration_ns); auto total_rowsets = std::accumulate( @@ -601,6 +611,7 @@ Status OlapScanLocalState::hold_tablets() { cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(), _scan_ranges.size()); } + _prepared = true; return Status::OK(); } @@ -783,10 +794,5 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i } } -Status OlapScanOperatorX::hold_tablets(RuntimeState* state) { - auto& local_state = ScanOperatorX::get_local_state(state); - return local_state.hold_tablets(); -} - #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 7c4c2d5138d09a..b29da164e20472 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -41,7 +41,8 @@ class OlapScanLocalState final : public ScanLocalState { using Base = ScanLocalState; ENABLE_FACTORY_CREATOR(OlapScanLocalState); OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {} - + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status prepare(RuntimeState* state) override; TOlapScanNode& olap_scan_node() const; std::string name_suffix() const override { @@ -49,23 +50,19 @@ class OlapScanLocalState final : public ScanLocalState { std::to_string(_parent->node_id()), std::to_string(_parent->nereids_id()), olap_scan_node().table_name); } - Status hold_tablets(); std::vector execution_dependencies() override { - if (_cloud_tablet_dependencies.empty()) { + if (!_cloud_tablet_dependency) { return Base::execution_dependencies(); } - std::vector res(_cloud_tablet_dependencies.size()); - std::transform(_cloud_tablet_dependencies.begin(), _cloud_tablet_dependencies.end(), - res.begin(), [](DependencySPtr dep) { return dep.get(); }); - std::vector tmp = Base::execution_dependencies(); - std::copy(tmp.begin(), tmp.end(), std::inserter(res, res.end())); + std::vector res = Base::execution_dependencies(); + res.push_back(_cloud_tablet_dependency.get()); return res; } - Status sync_cloud_tablets(RuntimeState* state) override; private: friend class vectorized::OlapScanner; + Status _sync_cloud_tablets(RuntimeState* state); void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) override; Status _init_profile() override; @@ -103,7 +100,9 @@ class OlapScanLocalState final : public ScanLocalState { std::vector _sync_statistics; std::vector> _tasks; int64_t _duration_ns = 0; - std::vector> _cloud_tablet_dependencies; + std::shared_ptr _cloud_tablet_dependency; + std::atomic_int32_t _pending_tablets_num = 0; + bool _prepared = false; std::future _cloud_tablet_future; std::atomic_bool _sync_tablet = false; std::vector> _cond_ranges; @@ -250,7 +249,6 @@ class OlapScanOperatorX final : public ScanOperatorX { OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs, int parallel_tasks, const TQueryCacheParam& cache_param); - Status hold_tablets(RuntimeState* state) override; private: friend class OlapScanLocalState; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 720f34eb8f59e5..197125249c855e 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -171,6 +171,7 @@ class PipelineXLocalStateBase { // Do initialization. This step should be executed only once and in bthread, so we can do some // lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX) virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0; + virtual Status prepare(RuntimeState* state) = 0; // Do initialization. This step can be executed multiple times, so we should make sure it is // idempotent (e.g. wait for runtime filters). virtual Status open(RuntimeState* state) = 0; @@ -261,6 +262,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase { ~PipelineXLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; + Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override; virtual std::string name_suffix() const; @@ -440,6 +442,7 @@ class PipelineXSinkLocalStateBase { // lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX) virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0; + virtual Status prepare(RuntimeState* state) = 0; // Do initialization. This step can be executed multiple times, so we should make sure it is // idempotent (e.g. wait for runtime filters). virtual Status open(RuntimeState* state) = 0; @@ -518,6 +521,7 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase { Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } Status terminate(RuntimeState* state) override; @@ -830,8 +834,6 @@ class OperatorXBase : public OperatorBase { [[nodiscard]] std::string get_name() const override { return _op_name; } [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; } - // Tablets should be hold before open phase. - [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return Status::OK(); } Status prepare(RuntimeState* state) override; Status terminate(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 2a9357db532b73..44d87b73c1d62a 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -81,7 +81,6 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) _filter_dependencies, p.get_name() + "_FILTER_DEPENDENCY")); RETURN_IF_ERROR(_init_profile()); set_scan_ranges(state, info.scan_ranges); - RETURN_IF_ERROR(sync_cloud_tablets(state)); _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter"); return Status::OK(); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 90d51d4bf56b3b..d3e043032c2313 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -80,8 +80,6 @@ class ScanLocalStateBase : public PipelineXLocalState<> { virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0; virtual void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) = 0; - virtual Status sync_cloud_tablets(RuntimeState* state) = 0; - virtual TPushAggOp::type get_push_down_agg_type() = 0; virtual int64_t get_push_down_count() = 0; @@ -152,7 +150,6 @@ class ScanLocalState : public ScanLocalStateBase { Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) override; void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) override {} - Status sync_cloud_tablets(RuntimeState* state) override { return Status::OK(); } TPushAggOp::type get_push_down_agg_type() override; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index ecad6a6758b04a..bafabfe8d0f304 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -257,6 +257,16 @@ Status PipelineTask::_open() { return Status::OK(); } +Status PipelineTask::_prepare() { + SCOPED_TIMER(_task_profile->total_time_counter()); + SCOPED_CPU_TIMER(_task_cpu_timer); + for (auto& o : _operators) { + RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->prepare(_state)); + } + RETURN_IF_ERROR(_state->get_sink_local_state()->prepare(_state)); + return Status::OK(); +} + bool PipelineTask::_wait_to_start() { // Before task starting, we should make sure // 1. Execution dependency is ready (which is controlled by FE 2-phase commit) @@ -398,6 +408,9 @@ Status PipelineTask::execute(bool* done) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); + if (!_wake_up_early) { + RETURN_IF_ERROR(_prepare()); + } DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", { Status status = Status::Error("fault_inject pipeline_task execute failed"); return status; @@ -406,10 +419,6 @@ Status PipelineTask::execute(bool* done) { if (_wait_to_start() || _wake_up_early) { return Status::OK(); } - if (!_hold_cloud_tablet && !_wake_up_early) { - RETURN_IF_ERROR(_source->hold_tablets(_state)); - } - _hold_cloud_tablet = true; // The status must be runnable if (!_opened && !fragment_context->is_canceled()) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index e15e18b4aa9104..052b4a8017a42c 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -182,6 +182,7 @@ class PipelineTask : public std::enable_shared_from_this { void _init_profile(); void _fresh_profile_counter(); Status _open(); + Status _prepare(); // Operator `op` try to reserve memory before executing. Return false if reserve failed // otherwise return true. @@ -239,7 +240,6 @@ class PipelineTask : public std::enable_shared_from_this { std::vector _write_dependencies; std::vector _finish_dependencies; std::vector _execution_dependencies; - std::atomic_bool _hold_cloud_tablet = false; // All shared states of this pipeline task. std::map> _op_shared_states; From 69dc2f77e656cfcbb8020a9820d1a79c008eca97 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Jul 2025 18:52:40 +0800 Subject: [PATCH 10/17] update --- be/src/pipeline/exec/operator.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 197125249c855e..7326dc83ba9de8 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -171,6 +171,10 @@ class PipelineXLocalStateBase { // Do initialization. This step should be executed only once and in bthread, so we can do some // lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX) virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0; + // Make sure all resources are ready before execution. For example, remote tablets should be + // loaded to local storage. + // This is called by execution pthread and different from `Operator::prepare` which is called + // by bthread. virtual Status prepare(RuntimeState* state) = 0; // Do initialization. This step can be executed multiple times, so we should make sure it is // idempotent (e.g. wait for runtime filters). From a00770a4b8bf8c31baeed3a4b0df79cb82d63542 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Jul 2025 22:53:08 +0800 Subject: [PATCH 11/17] update --- be/src/pipeline/pipeline_task.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index bafabfe8d0f304..c78a8b9acb5c40 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -419,6 +419,7 @@ Status PipelineTask::execute(bool* done) { if (_wait_to_start() || _wake_up_early) { return Status::OK(); } + RETURN_IF_ERROR(_prepare()); // The status must be runnable if (!_opened && !fragment_context->is_canceled()) { From 45a839e27eea5ddcfd498e23f618ca49cce3799e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 10 Jul 2025 09:16:07 +0800 Subject: [PATCH 12/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index dac407fbae1752..f46e32ffa12205 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -457,14 +457,17 @@ Status OlapScanLocalState::_init_scanners(std::list* sc Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { if (config::is_cloud_mode() && !_sync_tablet) { - _cloud_tablet_dependency = Dependency::create_shared( - _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); + if (_scan_ranges.size() > 0) { + _cloud_tablet_dependency = Dependency::create_shared( + _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); + } _pending_tablets_num = cast_set(_scan_ranges.size()); _tablets.resize(_scan_ranges.size()); _tasks.reserve(_scan_ranges.size()); _sync_statistics.resize(_scan_ranges.size()); SCOPED_RAW_TIMER(&_duration_ns); + DCHECK_GT(_pending_tablets_num, 0); for (size_t i = 0; i < _scan_ranges.size(); i++) { auto* sync_stats = &_sync_statistics[i]; int64_t version = 0; From bd2b1b065e4a1c263c7534d53177400cc7994be9 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 10 Jul 2025 13:23:21 +0800 Subject: [PATCH 13/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 77 ++++++++++----------- be/src/pipeline/exec/olap_scan_operator.h | 2 +- 2 files changed, 39 insertions(+), 40 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index f46e32ffa12205..cd4fb0c6a5f3de 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -457,50 +457,49 @@ Status OlapScanLocalState::_init_scanners(std::list* sc Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { if (config::is_cloud_mode() && !_sync_tablet) { - if (_scan_ranges.size() > 0) { + _pending_tablets_num = _scan_ranges.size(); + if (_pending_tablets_num > 0) { _cloud_tablet_dependency = Dependency::create_shared( _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); - } - _pending_tablets_num = cast_set(_scan_ranges.size()); - - _tablets.resize(_scan_ranges.size()); - _tasks.reserve(_scan_ranges.size()); - _sync_statistics.resize(_scan_ranges.size()); - SCOPED_RAW_TIMER(&_duration_ns); - DCHECK_GT(_pending_tablets_num, 0); - for (size_t i = 0; i < _scan_ranges.size(); i++) { - auto* sync_stats = &_sync_statistics[i]; - int64_t version = 0; - std::from_chars(_scan_ranges[i]->version.data(), - _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), - version); - auto task_ctx = state->get_task_execution_context(); - _tasks.emplace_back([this, sync_stats, version, i, task_ctx]() { - auto task_lock = task_ctx.lock(); - if (task_lock == nullptr) { - return Status::OK(); - } - Defer defer([&] { - if (_pending_tablets_num.fetch_sub(1) == 1) { - _cloud_tablet_dependency->set_ready(); + _tablets.resize(_scan_ranges.size()); + _tasks.reserve(_scan_ranges.size()); + _sync_statistics.resize(_scan_ranges.size()); + SCOPED_RAW_TIMER(&_duration_ns); + DCHECK_GT(_pending_tablets_num, 0); + for (size_t i = 0; i < _scan_ranges.size(); i++) { + auto* sync_stats = &_sync_statistics[i]; + int64_t version = 0; + std::from_chars(_scan_ranges[i]->version.data(), + _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), + version); + auto task_ctx = state->get_task_execution_context(); + _tasks.emplace_back([this, sync_stats, version, i, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + return Status::OK(); } + Defer defer([&] { + if (_pending_tablets_num.fetch_sub(1) == 1) { + _cloud_tablet_dependency->set_ready(); + } + }); + auto tablet = + DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); + _tablets[i] = {std::move(tablet), version}; + SyncOptions options; + options.query_version = version; + options.merge_schema = true; + RETURN_IF_ERROR(std::dynamic_pointer_cast(_tablets[i].tablet) + ->sync_rowsets(options, sync_stats)); + // FIXME(plat1ko): Avoid pointer cast + ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count( + *_tablets[i].tablet); + return Status::OK(); }); - auto tablet = - DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); - _tablets[i] = {std::move(tablet), version}; - SyncOptions options; - options.query_version = version; - options.merge_schema = true; - RETURN_IF_ERROR(std::dynamic_pointer_cast(_tablets[i].tablet) - ->sync_rowsets(options, sync_stats)); - // FIXME(plat1ko): Avoid pointer cast - ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count( - *_tablets[i].tablet); - return Status::OK(); - }); + } + RETURN_IF_ERROR(cloud::bthread_fork_join( + _tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); } - RETURN_IF_ERROR(cloud::bthread_fork_join( - _tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); _sync_tablet = true; } return Status::OK(); diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index b29da164e20472..f38b1b965655fa 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -101,7 +101,7 @@ class OlapScanLocalState final : public ScanLocalState { std::vector> _tasks; int64_t _duration_ns = 0; std::shared_ptr _cloud_tablet_dependency; - std::atomic_int32_t _pending_tablets_num = 0; + std::atomic _pending_tablets_num = 0; bool _prepared = false; std::future _cloud_tablet_future; std::atomic_bool _sync_tablet = false; From 18e61f180f4b25eecdbf783a7d0b39a08439aa3f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 10 Jul 2025 13:23:57 +0800 Subject: [PATCH 14/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index cd4fb0c6a5f3de..0d2aab3c85d654 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -514,7 +514,8 @@ Status OlapScanLocalState::prepare(RuntimeState* state) { _read_sources.resize(_scan_ranges.size()); if (config::is_cloud_mode()) { - if (_cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) { + if (!_cloud_tablet_dependency && + _cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) { // Remote tablet still in-flight. return Status::OK(); } From 2e2f61a9583d32208f5ff3d0564f014a16589ca5 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 10 Jul 2025 14:45:03 +0800 Subject: [PATCH 15/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 0d2aab3c85d654..13ec5504fc4b3a 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -514,7 +514,7 @@ Status OlapScanLocalState::prepare(RuntimeState* state) { _read_sources.resize(_scan_ranges.size()); if (config::is_cloud_mode()) { - if (!_cloud_tablet_dependency && + if (_cloud_tablet_dependency && _cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) { // Remote tablet still in-flight. return Status::OK(); From 26262626b0b87532253da053283f6bc364a62c23 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 10 Jul 2025 17:53:05 +0800 Subject: [PATCH 16/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 13ec5504fc4b3a..b106cc9223dede 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -514,7 +514,7 @@ Status OlapScanLocalState::prepare(RuntimeState* state) { _read_sources.resize(_scan_ranges.size()); if (config::is_cloud_mode()) { - if (_cloud_tablet_dependency && + if (!_cloud_tablet_dependency || _cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) { // Remote tablet still in-flight. return Status::OK(); From 36f2eb433af77dbd0208bb18c7a3bc3ced58ced9 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 16 Jul 2025 19:06:52 +0800 Subject: [PATCH 17/17] update --- be/src/pipeline/exec/olap_scan_operator.cpp | 12 +++++++----- be/src/pipeline/exec/olap_scan_operator.h | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index b106cc9223dede..d2f079c5ab9182 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -459,13 +459,12 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { if (config::is_cloud_mode() && !_sync_tablet) { _pending_tablets_num = _scan_ranges.size(); if (_pending_tablets_num > 0) { + _sync_cloud_tablets_watcher.start(); _cloud_tablet_dependency = Dependency::create_shared( _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); _tablets.resize(_scan_ranges.size()); _tasks.reserve(_scan_ranges.size()); _sync_statistics.resize(_scan_ranges.size()); - SCOPED_RAW_TIMER(&_duration_ns); - DCHECK_GT(_pending_tablets_num, 0); for (size_t i = 0; i < _scan_ranges.size(); i++) { auto* sync_stats = &_sync_statistics[i]; int64_t version = 0; @@ -481,6 +480,7 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { Defer defer([&] { if (_pending_tablets_num.fetch_sub(1) == 1) { _cloud_tablet_dependency->set_ready(); + _sync_cloud_tablets_watcher.stop(); } }); auto tablet = @@ -520,7 +520,7 @@ Status OlapScanLocalState::prepare(RuntimeState* state) { return Status::OK(); } DCHECK(_cloud_tablet_future.valid() && _cloud_tablet_future.get().ok()); - COUNTER_UPDATE(_sync_rowset_timer, _duration_ns); + COUNTER_UPDATE(_sync_rowset_timer, _sync_cloud_tablets_watcher.elapsed_time()); auto total_rowsets = std::accumulate( _tablets.cbegin(), _tablets.cend(), 0LL, [](long long acc, const auto& tabletWithVersion) { @@ -546,14 +546,16 @@ Status OlapScanLocalState::prepare(RuntimeState* state) { COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer, sync_stats.get_remote_delete_bitmap_rpc_ns); } - auto time_ms = _duration_ns / 1000 / 1000; + auto time_ms = _sync_cloud_tablets_watcher.elapsed_time_microseconds(); if (time_ms >= config::sync_rowsets_slow_threshold_ms) { DorisMetrics::instance()->get_remote_tablet_slow_time_ms->increment(time_ms); DorisMetrics::instance()->get_remote_tablet_slow_cnt->increment(1); LOG_WARNING("get tablet takes too long") .tag("query_id", print_id(PipelineXLocalState<>::_state->query_id())) .tag("node_id", _parent->node_id()) - .tag("total_time", PrettyPrinter::print(_duration_ns, TUnit::TIME_NS)) + .tag("total_time", + PrettyPrinter::print(_sync_cloud_tablets_watcher.elapsed_time(), + TUnit::TIME_NS)) .tag("num_tablets", _tablets.size()) .tag("tablet_meta_cache_hit", _sync_rowset_tablet_meta_cache_hit->value()) .tag("tablet_meta_cache_miss", _sync_rowset_tablet_meta_cache_miss->value()) diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index f38b1b965655fa..aa4ff2d6cb4f5f 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -99,7 +99,7 @@ class OlapScanLocalState final : public ScanLocalState { std::vector> _scan_ranges; std::vector _sync_statistics; std::vector> _tasks; - int64_t _duration_ns = 0; + MonotonicStopWatch _sync_cloud_tablets_watcher; std::shared_ptr _cloud_tablet_dependency; std::atomic _pending_tablets_num = 0; bool _prepared = false;