diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d63570ff4dffa9..7f57fbe9459216 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1050,9 +1050,6 @@ DEFINE_mInt32(segcompaction_num_threads, "5"); // enable java udf and jdbc scannode DEFINE_Bool(enable_java_support, "true"); -// enable prefetch tablets before opening -DEFINE_mBool(enable_prefetch_tablet, "true"); - // Set config randomly to check more issues in github workflow DEFINE_Bool(enable_fuzzy_mode, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a867bfccfada96..65a4bcba0d1ee6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1093,9 +1093,6 @@ DECLARE_mInt32(segcompaction_num_threads); // enable java udf and jdbc scannode DECLARE_Bool(enable_java_support); -// enable prefetch tablets before opening -DECLARE_mBool(enable_prefetch_tablet); - // Set config randomly to check more issues in github workflow DECLARE_Bool(enable_fuzzy_mode); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 16823757fdf869..2de1c09ffe8c6a 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -50,7 +50,7 @@ class MultiCastDataStreamSourceLocalState final Status close(RuntimeState* state) override; friend class MultiCastDataStreamerSourceOperatorX; - std::vector filter_dependencies() override { + std::vector execution_dependencies() override { if (_filter_dependencies.empty()) { return {}; } diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index f9c994c56faf51..d2f079c5ab9182 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -46,6 +46,12 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" +Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + RETURN_IF_ERROR(_sync_cloud_tablets(state)); + return Status::OK(); +} + Status OlapScanLocalState::_init_profile() { RETURN_IF_ERROR(ScanLocalState::_init_profile()); // Rows read from storage. @@ -356,7 +362,6 @@ Status OlapScanLocalState::_init_scanners(std::list* sc bool has_cpu_limit = state()->query_options().__isset.resource_limit && state()->query_options().resource_limit.__isset.cpu_limit; - RETURN_IF_ERROR(hold_tablets()); if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit && p._push_down_agg_type == TPushAggOp::NONE && (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) { @@ -450,30 +455,34 @@ Status OlapScanLocalState::_init_scanners(std::list* sc return Status::OK(); } -Status OlapScanLocalState::hold_tablets() { - if (!_tablets.empty()) { - return Status::OK(); - } - - MonotonicStopWatch timer; - timer.start(); - _tablets.resize(_scan_ranges.size()); - _read_sources.resize(_scan_ranges.size()); - - if (config::is_cloud_mode()) { - std::vector sync_statistics(_scan_ranges.size()); - std::vector> tasks {}; - tasks.reserve(_scan_ranges.size()); - int64_t duration_ns {0}; - { - SCOPED_RAW_TIMER(&duration_ns); +Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { + if (config::is_cloud_mode() && !_sync_tablet) { + _pending_tablets_num = _scan_ranges.size(); + if (_pending_tablets_num > 0) { + _sync_cloud_tablets_watcher.start(); + _cloud_tablet_dependency = Dependency::create_shared( + _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); + _tablets.resize(_scan_ranges.size()); + _tasks.reserve(_scan_ranges.size()); + _sync_statistics.resize(_scan_ranges.size()); for (size_t i = 0; i < _scan_ranges.size(); i++) { - auto* sync_stats = &sync_statistics[i]; + auto* sync_stats = &_sync_statistics[i]; int64_t version = 0; std::from_chars(_scan_ranges[i]->version.data(), _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), version); - tasks.emplace_back([this, sync_stats, version, i]() { + auto task_ctx = state->get_task_execution_context(); + _tasks.emplace_back([this, sync_stats, version, i, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + return Status::OK(); + } + Defer defer([&] { + if (_pending_tablets_num.fetch_sub(1) == 1) { + _cloud_tablet_dependency->set_ready(); + _sync_cloud_tablets_watcher.stop(); + } + }); auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); _tablets[i] = {std::move(tablet), version}; @@ -488,17 +497,37 @@ Status OlapScanLocalState::hold_tablets() { return Status::OK(); }); } - RETURN_IF_ERROR( - cloud::bthread_fork_join(tasks, config::init_scanner_sync_rowsets_parallelism)); + RETURN_IF_ERROR(cloud::bthread_fork_join( + _tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future)); + } + _sync_tablet = true; + } + return Status::OK(); +} + +Status OlapScanLocalState::prepare(RuntimeState* state) { + if (_prepared) { + return Status::OK(); + } + MonotonicStopWatch timer; + timer.start(); + _read_sources.resize(_scan_ranges.size()); + + if (config::is_cloud_mode()) { + if (!_cloud_tablet_dependency || + _cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) { + // Remote tablet still in-flight. + return Status::OK(); } - COUNTER_UPDATE(_sync_rowset_timer, duration_ns); + DCHECK(_cloud_tablet_future.valid() && _cloud_tablet_future.get().ok()); + COUNTER_UPDATE(_sync_rowset_timer, _sync_cloud_tablets_watcher.elapsed_time()); auto total_rowsets = std::accumulate( _tablets.cbegin(), _tablets.cend(), 0LL, [](long long acc, const auto& tabletWithVersion) { return acc + tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size(); }); COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets); - for (const auto& sync_stats : sync_statistics) { + for (const auto& sync_stats : _sync_statistics) { COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, sync_stats.tablet_meta_cache_hit); COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, sync_stats.tablet_meta_cache_miss); COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer, @@ -517,14 +546,16 @@ Status OlapScanLocalState::hold_tablets() { COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer, sync_stats.get_remote_delete_bitmap_rpc_ns); } - auto time_ms = duration_ns / 1000 / 1000; + auto time_ms = _sync_cloud_tablets_watcher.elapsed_time_microseconds(); if (time_ms >= config::sync_rowsets_slow_threshold_ms) { DorisMetrics::instance()->get_remote_tablet_slow_time_ms->increment(time_ms); DorisMetrics::instance()->get_remote_tablet_slow_cnt->increment(1); LOG_WARNING("get tablet takes too long") .tag("query_id", print_id(PipelineXLocalState<>::_state->query_id())) .tag("node_id", _parent->node_id()) - .tag("total_time", PrettyPrinter::print(duration_ns, TUnit::TIME_NS)) + .tag("total_time", + PrettyPrinter::print(_sync_cloud_tablets_watcher.elapsed_time(), + TUnit::TIME_NS)) .tag("num_tablets", _tablets.size()) .tag("tablet_meta_cache_hit", _sync_rowset_tablet_meta_cache_hit->value()) .tag("tablet_meta_cache_miss", _sync_rowset_tablet_meta_cache_miss->value()) @@ -550,8 +581,8 @@ Status OlapScanLocalState::hold_tablets() { _sync_rowset_get_remote_delete_bitmap_rpc_timer->value(), TUnit::TIME_NS)); } - } else { + _tablets.resize(_scan_ranges.size()); for (size_t i = 0; i < _scan_ranges.size(); i++) { int64_t version = 0; std::from_chars(_scan_ranges[i]->version.data(), @@ -585,6 +616,7 @@ Status OlapScanLocalState::hold_tablets() { cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(), _scan_ranges.size()); } + _prepared = true; return Status::OK(); } @@ -767,10 +799,5 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i } } -Status OlapScanOperatorX::hold_tablets(RuntimeState* state) { - auto& local_state = ScanOperatorX::get_local_state(state); - return local_state.hold_tablets(); -} - #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index f77d914579d3d6..aa4ff2d6cb4f5f 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -21,29 +21,28 @@ #include +#include "cloud/cloud_tablet.h" #include "common/status.h" #include "olap/tablet_reader.h" #include "operator.h" #include "pipeline/exec/scan_operator.h" -namespace doris { -#include "common/compile_check_begin.h" - -namespace vectorized { +namespace doris::vectorized { class OlapScanner; -} -} // namespace doris +} // namespace doris::vectorized namespace doris::pipeline { +#include "common/compile_check_begin.h" class OlapScanOperatorX; class OlapScanLocalState final : public ScanLocalState { public: using Parent = OlapScanOperatorX; + using Base = ScanLocalState; ENABLE_FACTORY_CREATOR(OlapScanLocalState); - OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) - : ScanLocalState(state, parent) {} - + OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {} + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status prepare(RuntimeState* state) override; TOlapScanNode& olap_scan_node() const; std::string name_suffix() const override { @@ -51,11 +50,19 @@ class OlapScanLocalState final : public ScanLocalState { std::to_string(_parent->node_id()), std::to_string(_parent->nereids_id()), olap_scan_node().table_name); } - Status hold_tablets(); + std::vector execution_dependencies() override { + if (!_cloud_tablet_dependency) { + return Base::execution_dependencies(); + } + std::vector res = Base::execution_dependencies(); + res.push_back(_cloud_tablet_dependency.get()); + return res; + } private: friend class vectorized::OlapScanner; + Status _sync_cloud_tablets(RuntimeState* state); void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) override; Status _init_profile() override; @@ -90,6 +97,14 @@ class OlapScanLocalState final : public ScanLocalState { Status _build_key_ranges_and_filters(); std::vector> _scan_ranges; + std::vector _sync_statistics; + std::vector> _tasks; + MonotonicStopWatch _sync_cloud_tablets_watcher; + std::shared_ptr _cloud_tablet_dependency; + std::atomic _pending_tablets_num = 0; + bool _prepared = false; + std::future _cloud_tablet_future; + std::atomic_bool _sync_tablet = false; std::vector> _cond_ranges; OlapScanKeys _scan_keys; std::vector> _olap_filters; @@ -234,7 +249,6 @@ class OlapScanOperatorX final : public ScanOperatorX { OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs, int parallel_tasks, const TQueryCacheParam& cache_param); - Status hold_tablets(RuntimeState* state) override; private: friend class OlapScanLocalState; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 491bdf9a06e93e..7326dc83ba9de8 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -171,6 +171,11 @@ class PipelineXLocalStateBase { // Do initialization. This step should be executed only once and in bthread, so we can do some // lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX) virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0; + // Make sure all resources are ready before execution. For example, remote tablets should be + // loaded to local storage. + // This is called by execution pthread and different from `Operator::prepare` which is called + // by bthread. + virtual Status prepare(RuntimeState* state) = 0; // Do initialization. This step can be executed multiple times, so we should make sure it is // idempotent (e.g. wait for runtime filters). virtual Status open(RuntimeState* state) = 0; @@ -201,7 +206,7 @@ class PipelineXLocalStateBase { virtual Dependency* finishdependency() { return nullptr; } virtual Dependency* spill_dependency() const { return nullptr; } // override in Scan MultiCastSink - virtual std::vector filter_dependencies() { return {}; } + virtual std::vector execution_dependencies() { return {}; } Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts, vectorized::Block* block, size_t column_to_keep); @@ -261,6 +266,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase { ~PipelineXLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; + Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override; virtual std::string name_suffix() const; @@ -440,6 +446,7 @@ class PipelineXSinkLocalStateBase { // lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX) virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0; + virtual Status prepare(RuntimeState* state) = 0; // Do initialization. This step can be executed multiple times, so we should make sure it is // idempotent (e.g. wait for runtime filters). virtual Status open(RuntimeState* state) = 0; @@ -518,6 +525,7 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase { Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } Status terminate(RuntimeState* state) override; @@ -830,8 +838,6 @@ class OperatorXBase : public OperatorBase { [[nodiscard]] std::string get_name() const override { return _op_name; } [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; } - // Tablets should be hold before open phase. - [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return Status::OK(); } Status prepare(RuntimeState* state) override; Status terminate(RuntimeState* state) override; @@ -1116,7 +1122,9 @@ class DummyOperatorLocalState final : public PipelineXLocalState dependencies() const override { return {_tmp_dependency.get()}; } - std::vector filter_dependencies() override { return {_filter_dependency.get()}; } + std::vector execution_dependencies() override { + return {_filter_dependency.get()}; + } Dependency* spill_dependency() const override { return _spill_dependency.get(); } private: diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index a053fa66909a28..d3e043032c2313 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -80,7 +80,6 @@ class ScanLocalStateBase : public PipelineXLocalState<> { virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0; virtual void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) = 0; - virtual TPushAggOp::type get_push_down_agg_type() = 0; virtual int64_t get_push_down_count() = 0; @@ -156,15 +155,13 @@ class ScanLocalState : public ScanLocalStateBase { int64_t get_push_down_count() override; - std::vector filter_dependencies() override { + std::vector execution_dependencies() override { if (_filter_dependencies.empty()) { return {}; } - std::vector res; - res.resize(_filter_dependencies.size()); - for (size_t i = 0; i < _filter_dependencies.size(); i++) { - res[i] = _filter_dependencies[i].get(); - } + std::vector res(_filter_dependencies.size()); + std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(), + [](DependencySPtr dep) { return dep.get(); }); return res; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 1629f3c504468e..c78a8b9acb5c40 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -78,9 +78,9 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState _sink(pipeline->sink_shared_pointer()), _shared_state_map(std::move(shared_state_map)), _task_idx(task_idx), - _execution_dep(state->get_query_ctx()->get_execution_dependency()), _memory_sufficient_dependency(state->get_query_ctx()->get_memory_sufficient_dependency()), _pipeline_name(_pipeline->name()) { + _execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency()); if (!_shared_state_map.contains(_sink->dests_id().front())) { auto shared_state = _sink->create_shared_state(); if (shared_state) { @@ -119,13 +119,11 @@ Status PipelineTask::prepare(const std::vector& scan_range, co parent_profile = _state->get_local_state(op->operator_id())->profile(); } { - std::vector filter_dependencies; - const auto& deps = _state->get_local_state(_source->operator_id())->filter_dependencies(); - std::copy(deps.begin(), deps.end(), - std::inserter(filter_dependencies, filter_dependencies.end())); - + const auto& deps = + _state->get_local_state(_source->operator_id())->execution_dependencies(); std::unique_lock lc(_dependency_lock); - filter_dependencies.swap(_filter_dependencies); + std::copy(deps.begin(), deps.end(), + std::inserter(_execution_dependencies, _execution_dependencies.end())); } if (auto fragment = _fragment_context.lock()) { if (fragment->get_query_ctx()->is_cancelled()) { @@ -259,14 +257,24 @@ Status PipelineTask::_open() { return Status::OK(); } +Status PipelineTask::_prepare() { + SCOPED_TIMER(_task_profile->total_time_counter()); + SCOPED_CPU_TIMER(_task_cpu_timer); + for (auto& o : _operators) { + RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->prepare(_state)); + } + RETURN_IF_ERROR(_state->get_sink_local_state()->prepare(_state)); + return Status::OK(); +} + bool PipelineTask::_wait_to_start() { // Before task starting, we should make sure // 1. Execution dependency is ready (which is controlled by FE 2-phase commit) // 2. Runtime filter dependencies are ready - return _execution_dep->is_blocked_by(shared_from_this()) || - std::any_of( - _filter_dependencies.begin(), _filter_dependencies.end(), - [&](Dependency* dep) -> bool { return dep->is_blocked_by(shared_from_this()); }); + // 3. All tablets are loaded into local storage + return std::any_of( + _execution_dependencies.begin(), _execution_dependencies.end(), + [&](Dependency* dep) -> bool { return dep->is_blocked_by(shared_from_this()); }); } bool PipelineTask::_is_pending_finish() { @@ -320,8 +328,6 @@ void PipelineTask::terminate() { DCHECK(_wake_up_early || fragment->is_canceled()); std::for_each(_spill_dependencies.begin(), _spill_dependencies.end(), [&](Dependency* dep) { dep->set_always_ready(); }); - std::for_each(_filter_dependencies.begin(), _filter_dependencies.end(), - [&](Dependency* dep) { dep->set_always_ready(); }); std::for_each(_write_dependencies.begin(), _write_dependencies.end(), [&](Dependency* dep) { dep->set_always_ready(); }); std::for_each(_finish_dependencies.begin(), _finish_dependencies.end(), @@ -331,7 +337,9 @@ void PipelineTask::terminate() { std::for_each(deps.begin(), deps.end(), [&](Dependency* dep) { dep->set_always_ready(); }); }); - _execution_dep->set_ready(); + // All `_execution_deps` will never be set blocking from ready. So we just set ready here. + std::for_each(_execution_dependencies.begin(), _execution_dependencies.end(), + [&](Dependency* dep) { dep->set_ready(); }); _memory_sufficient_dependency->set_ready(); } } @@ -400,17 +408,18 @@ Status PipelineTask::execute(bool* done) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); + if (!_wake_up_early) { + RETURN_IF_ERROR(_prepare()); + } DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", { Status status = Status::Error("fault_inject pipeline_task execute failed"); return status; }); // `_wake_up_early` must be after `_wait_to_start()` if (_wait_to_start() || _wake_up_early) { - if (config::enable_prefetch_tablet) { - RETURN_IF_ERROR(_source->hold_tablets(_state)); - } return Status::OK(); } + RETURN_IF_ERROR(_prepare()); // The status must be runnable if (!_opened && !fragment_context->is_canceled()) { @@ -740,10 +749,10 @@ std::string PipelineTask::debug_string() { _write_dependencies[j]->debug_string(i + 1)); } - fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency Information: \n"); - for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) { + fmt::format_to(debug_string_buffer, "\nExecution Dependency Information: \n"); + for (size_t j = 0; j < _execution_dependencies.size(); j++, i++) { fmt::format_to(debug_string_buffer, "{}. {}\n", i, - _filter_dependencies[j]->debug_string(i + 1)); + _execution_dependencies[j]->debug_string(i + 1)); } fmt::format_to(debug_string_buffer, "\nSpill Dependency Information: \n"); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 9c65b24d8cd32f..052b4a8017a42c 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -182,6 +182,7 @@ class PipelineTask : public std::enable_shared_from_this { void _init_profile(); void _fresh_profile_counter(); Status _open(); + Status _prepare(); // Operator `op` try to reserve memory before executing. Return false if reserve failed // otherwise return true. @@ -238,7 +239,7 @@ class PipelineTask : public std::enable_shared_from_this { std::vector _spill_dependencies; std::vector _write_dependencies; std::vector _finish_dependencies; - std::vector _filter_dependencies; + std::vector _execution_dependencies; // All shared states of this pipeline task. std::map> _op_shared_states; @@ -253,7 +254,6 @@ class PipelineTask : public std::enable_shared_from_this { unsigned long long _exec_time_slice = config::pipeline_task_exec_time_slice * NANOS_PER_MILLIS; Dependency* _blocked_dep = nullptr; - Dependency* _execution_dep = nullptr; Dependency* _memory_sufficient_dependency; std::mutex _dependency_lock; diff --git a/be/test/pipeline/pipeline_task_test.cpp b/be/test/pipeline/pipeline_task_test.cpp index 94ebf2a111af02..f5a08fd56e6606 100644 --- a/be/test/pipeline/pipeline_task_test.cpp +++ b/be/test/pipeline/pipeline_task_test.cpp @@ -318,7 +318,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id()) .value() ->dependencies() @@ -340,7 +340,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { { // task is blocked by filter dependency. _query_ctx->get_execution_dependency()->set_ready(); - task->_filter_dependencies.front()->block(); + task->_execution_dependencies.back()->block(); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); bool done = false; EXPECT_TRUE(task->execute(&done).ok()); @@ -348,8 +348,8 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { EXPECT_FALSE(done); EXPECT_FALSE(task->_wake_up_early); EXPECT_FALSE(task->_opened); - EXPECT_FALSE(task->_filter_dependencies.front()->ready()); - EXPECT_FALSE(task->_filter_dependencies.front()->_blocked_task.empty()); + EXPECT_FALSE(task->_execution_dependencies.back()->ready()); + EXPECT_FALSE(task->_execution_dependencies.back()->_blocked_task.empty()); EXPECT_TRUE(task->_read_dependencies.empty()); EXPECT_TRUE(task->_write_dependencies.empty()); EXPECT_TRUE(task->_finish_dependencies.empty()); @@ -358,7 +358,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { } { // `open` phase. And then task is blocked by read dependency. - task->_filter_dependencies.front()->set_ready(); + task->_execution_dependencies.back()->set_ready(); read_dep->block(); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); bool done = false; @@ -446,7 +446,7 @@ TEST_F(PipelineTaskTest, TEST_TERMINATE) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); } _query_ctx->get_execution_dependency()->set_ready(); { @@ -511,7 +511,7 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); } for (int i = 0; i < task->LEGAL_STATE_TRANSITION.size(); i++) { auto target = (PipelineTask::State)i; @@ -556,7 +556,7 @@ TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); } _query_ctx->get_execution_dependency()->set_ready(); { @@ -637,7 +637,7 @@ TEST_F(PipelineTaskTest, TEST_SINK_EOF) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); } _query_ctx->get_execution_dependency()->set_ready(); { @@ -718,7 +718,7 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id()) .value() ->dependencies() @@ -854,7 +854,7 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) { TDataSink tsink; EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); - EXPECT_FALSE(task->_filter_dependencies.empty()); + EXPECT_GT(task->_execution_dependencies.size(), 1); read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id()) .value() ->dependencies()