Skip to content
3 changes: 0 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1050,9 +1050,6 @@ DEFINE_mInt32(segcompaction_num_threads, "5");
// enable java udf and jdbc scannode
DEFINE_Bool(enable_java_support, "true");

// enable prefetch tablets before opening
DEFINE_mBool(enable_prefetch_tablet, "true");

// Set config randomly to check more issues in github workflow
DEFINE_Bool(enable_fuzzy_mode, "false");

Expand Down
3 changes: 0 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1093,9 +1093,6 @@ DECLARE_mInt32(segcompaction_num_threads);
// enable java udf and jdbc scannode
DECLARE_Bool(enable_java_support);

// enable prefetch tablets before opening
DECLARE_mBool(enable_prefetch_tablet);

// Set config randomly to check more issues in github workflow
DECLARE_Bool(enable_fuzzy_mode);

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class MultiCastDataStreamSourceLocalState final
Status close(RuntimeState* state) override;
friend class MultiCastDataStreamerSourceOperatorX;

std::vector<Dependency*> filter_dependencies() override {
std::vector<Dependency*> execution_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
Expand Down
91 changes: 59 additions & 32 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
RETURN_IF_ERROR(_sync_cloud_tablets(state));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个_sync_cloud_tablets 实际是一个执行逻辑,但是我们把他放到了一个初始化的逻辑里,可能会有一些不可预期的问题

return Status::OK();
}

Status OlapScanLocalState::_init_profile() {
RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::_init_profile());
// Rows read from storage.
Expand Down Expand Up @@ -356,7 +362,6 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
state()->query_options().resource_limit.__isset.cpu_limit;

RETURN_IF_ERROR(hold_tablets());
if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
p._push_down_agg_type == TPushAggOp::NONE &&
(_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
Expand Down Expand Up @@ -450,30 +455,34 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
return Status::OK();
}

Status OlapScanLocalState::hold_tablets() {
if (!_tablets.empty()) {
return Status::OK();
}

MonotonicStopWatch timer;
timer.start();
_tablets.resize(_scan_ranges.size());
_read_sources.resize(_scan_ranges.size());

if (config::is_cloud_mode()) {
std::vector<SyncRowsetStats> sync_statistics(_scan_ranges.size());
std::vector<std::function<Status()>> tasks {};
tasks.reserve(_scan_ranges.size());
int64_t duration_ns {0};
{
SCOPED_RAW_TIMER(&duration_ns);
Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
if (config::is_cloud_mode() && !_sync_tablet) {
_pending_tablets_num = _scan_ranges.size();
if (_pending_tablets_num > 0) {
_sync_cloud_tablets_watcher.start();
_cloud_tablet_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP");
_tablets.resize(_scan_ranges.size());
_tasks.reserve(_scan_ranges.size());
_sync_statistics.resize(_scan_ranges.size());
for (size_t i = 0; i < _scan_ranges.size(); i++) {
auto* sync_stats = &sync_statistics[i];
auto* sync_stats = &_sync_statistics[i];
int64_t version = 0;
std::from_chars(_scan_ranges[i]->version.data(),
_scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(),
version);
tasks.emplace_back([this, sync_stats, version, i]() {
auto task_ctx = state->get_task_execution_context();
_tasks.emplace_back([this, sync_stats, version, i, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
return Status::OK();
}
Defer defer([&] {
if (_pending_tablets_num.fetch_sub(1) == 1) {
_cloud_tablet_dependency->set_ready();
_sync_cloud_tablets_watcher.stop();
}
});
auto tablet =
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats));
_tablets[i] = {std::move(tablet), version};
Expand All @@ -488,17 +497,37 @@ Status OlapScanLocalState::hold_tablets() {
return Status::OK();
});
}
RETURN_IF_ERROR(
cloud::bthread_fork_join(tasks, config::init_scanner_sync_rowsets_parallelism));
RETURN_IF_ERROR(cloud::bthread_fork_join(
_tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future));
}
_sync_tablet = true;
}
return Status::OK();
}

Status OlapScanLocalState::prepare(RuntimeState* state) {
if (_prepared) {
return Status::OK();
}
MonotonicStopWatch timer;
timer.start();
_read_sources.resize(_scan_ranges.size());

if (config::is_cloud_mode()) {
if (!_cloud_tablet_dependency ||
_cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) {
// Remote tablet still in-flight.
return Status::OK();
}
COUNTER_UPDATE(_sync_rowset_timer, duration_ns);
DCHECK(_cloud_tablet_future.valid() && _cloud_tablet_future.get().ok());
COUNTER_UPDATE(_sync_rowset_timer, _sync_cloud_tablets_watcher.elapsed_time());
auto total_rowsets = std::accumulate(
_tablets.cbegin(), _tablets.cend(), 0LL,
[](long long acc, const auto& tabletWithVersion) {
return acc + tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size();
});
COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets);
for (const auto& sync_stats : sync_statistics) {
for (const auto& sync_stats : _sync_statistics) {
COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, sync_stats.tablet_meta_cache_hit);
COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, sync_stats.tablet_meta_cache_miss);
COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer,
Expand All @@ -517,14 +546,16 @@ Status OlapScanLocalState::hold_tablets() {
COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer,
sync_stats.get_remote_delete_bitmap_rpc_ns);
}
auto time_ms = duration_ns / 1000 / 1000;
auto time_ms = _sync_cloud_tablets_watcher.elapsed_time_microseconds();
if (time_ms >= config::sync_rowsets_slow_threshold_ms) {
DorisMetrics::instance()->get_remote_tablet_slow_time_ms->increment(time_ms);
DorisMetrics::instance()->get_remote_tablet_slow_cnt->increment(1);
LOG_WARNING("get tablet takes too long")
.tag("query_id", print_id(PipelineXLocalState<>::_state->query_id()))
.tag("node_id", _parent->node_id())
.tag("total_time", PrettyPrinter::print(duration_ns, TUnit::TIME_NS))
.tag("total_time",
PrettyPrinter::print(_sync_cloud_tablets_watcher.elapsed_time(),
TUnit::TIME_NS))
.tag("num_tablets", _tablets.size())
.tag("tablet_meta_cache_hit", _sync_rowset_tablet_meta_cache_hit->value())
.tag("tablet_meta_cache_miss", _sync_rowset_tablet_meta_cache_miss->value())
Expand All @@ -550,8 +581,8 @@ Status OlapScanLocalState::hold_tablets() {
_sync_rowset_get_remote_delete_bitmap_rpc_timer->value(),
TUnit::TIME_NS));
}

} else {
_tablets.resize(_scan_ranges.size());
for (size_t i = 0; i < _scan_ranges.size(); i++) {
int64_t version = 0;
std::from_chars(_scan_ranges[i]->version.data(),
Expand Down Expand Up @@ -585,6 +616,7 @@ Status OlapScanLocalState::hold_tablets() {
cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(),
_scan_ranges.size());
}
_prepared = true;
return Status::OK();
}

Expand Down Expand Up @@ -767,10 +799,5 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i
}
}

Status OlapScanOperatorX::hold_tablets(RuntimeState* state) {
auto& local_state = ScanOperatorX<OlapScanLocalState>::get_local_state(state);
return local_state.hold_tablets();
}

#include "common/compile_check_end.h"
} // namespace doris::pipeline
36 changes: 25 additions & 11 deletions be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,48 @@

#include <string>

#include "cloud/cloud_tablet.h"
#include "common/status.h"
#include "olap/tablet_reader.h"
#include "operator.h"
#include "pipeline/exec/scan_operator.h"

namespace doris {
#include "common/compile_check_begin.h"

namespace vectorized {
namespace doris::vectorized {
class OlapScanner;
}
} // namespace doris
} // namespace doris::vectorized

namespace doris::pipeline {
#include "common/compile_check_begin.h"

class OlapScanOperatorX;
class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
public:
using Parent = OlapScanOperatorX;
using Base = ScanLocalState<OlapScanLocalState>;
ENABLE_FACTORY_CREATOR(OlapScanLocalState);
OlapScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalState(state, parent) {}

OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status prepare(RuntimeState* state) override;
TOlapScanNode& olap_scan_node() const;

std::string name_suffix() const override {
return fmt::format(" (id={}. nereids_id={}. table name = {})",
std::to_string(_parent->node_id()),
std::to_string(_parent->nereids_id()), olap_scan_node().table_name);
}
Status hold_tablets();
std::vector<Dependency*> execution_dependencies() override {
if (!_cloud_tablet_dependency) {
return Base::execution_dependencies();
}
std::vector<Dependency*> res = Base::execution_dependencies();
res.push_back(_cloud_tablet_dependency.get());
return res;
}

private:
friend class vectorized::OlapScanner;

Status _sync_cloud_tablets(RuntimeState* state);
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
Status _init_profile() override;
Expand Down Expand Up @@ -90,6 +97,14 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
Status _build_key_ranges_and_filters();

std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
std::vector<SyncRowsetStats> _sync_statistics;
std::vector<std::function<Status()>> _tasks;
MonotonicStopWatch _sync_cloud_tablets_watcher;
std::shared_ptr<Dependency> _cloud_tablet_dependency;
std::atomic<size_t> _pending_tablets_num = 0;
bool _prepared = false;
std::future<Status> _cloud_tablet_future;
std::atomic_bool _sync_tablet = false;
std::vector<std::unique_ptr<doris::OlapScanRange>> _cond_ranges;
OlapScanKeys _scan_keys;
std::vector<FilterOlapParam<TCondition>> _olap_filters;
Expand Down Expand Up @@ -234,7 +249,6 @@ class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs, int parallel_tasks,
const TQueryCacheParam& cache_param);
Status hold_tablets(RuntimeState* state) override;

private:
friend class OlapScanLocalState;
Expand Down
16 changes: 12 additions & 4 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ class PipelineXLocalStateBase {
// Do initialization. This step should be executed only once and in bthread, so we can do some
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
// Make sure all resources are ready before execution. For example, remote tablets should be
// loaded to local storage.
// This is called by execution pthread and different from `Operator::prepare` which is called
// by bthread.
virtual Status prepare(RuntimeState* state) = 0;
// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
Expand Down Expand Up @@ -201,7 +206,7 @@ class PipelineXLocalStateBase {
virtual Dependency* finishdependency() { return nullptr; }
virtual Dependency* spill_dependency() const { return nullptr; }
// override in Scan MultiCastSink
virtual std::vector<Dependency*> filter_dependencies() { return {}; }
virtual std::vector<Dependency*> execution_dependencies() { return {}; }

Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
vectorized::Block* block, size_t column_to_keep);
Expand Down Expand Up @@ -261,6 +266,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase {
~PipelineXLocalState() override = default;

Status init(RuntimeState* state, LocalStateInfo& info) override;
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override;

virtual std::string name_suffix() const;
Expand Down Expand Up @@ -440,6 +446,7 @@ class PipelineXSinkLocalStateBase {
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0;

virtual Status prepare(RuntimeState* state) = 0;
// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
Expand Down Expand Up @@ -518,6 +525,7 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase {

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;

Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }

Status terminate(RuntimeState* state) override;
Expand Down Expand Up @@ -830,8 +838,6 @@ class OperatorXBase : public OperatorBase {
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; }

// Tablets should be hold before open phase.
[[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return Status::OK(); }
Status prepare(RuntimeState* state) override;

Status terminate(RuntimeState* state) override;
Expand Down Expand Up @@ -1116,7 +1122,9 @@ class DummyOperatorLocalState final : public PipelineXLocalState<FakeSharedState
~DummyOperatorLocalState() = default;

std::vector<Dependency*> dependencies() const override { return {_tmp_dependency.get()}; }
std::vector<Dependency*> filter_dependencies() override { return {_filter_dependency.get()}; }
std::vector<Dependency*> execution_dependencies() override {
return {_filter_dependency.get()};
}
Dependency* spill_dependency() const override { return _spill_dependency.get(); }

private:
Expand Down
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class ScanLocalStateBase : public PipelineXLocalState<> {
virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0;
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) = 0;

virtual TPushAggOp::type get_push_down_agg_type() = 0;

virtual int64_t get_push_down_count() = 0;
Expand Down Expand Up @@ -156,15 +155,13 @@ class ScanLocalState : public ScanLocalStateBase {

int64_t get_push_down_count() override;

std::vector<Dependency*> filter_dependencies() override {
std::vector<Dependency*> execution_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
std::vector<Dependency*> res;
res.resize(_filter_dependencies.size());
for (size_t i = 0; i < _filter_dependencies.size(); i++) {
res[i] = _filter_dependencies[i].get();
}
std::vector<Dependency*> res(_filter_dependencies.size());
std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(),
[](DependencySPtr dep) { return dep.get(); });
return res;
}

Expand Down
Loading
Loading