Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 7 additions & 40 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,6 @@ class RuntimePredicateWrapper {
_filter_type(type),
_filter_id(filter_id) {}

RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool,
const RuntimeFilterParams* params)
: _query_ctx(query_ctx),
_be_exec_version(_query_ctx->be_exec_version()),
_pool(pool),
_column_return_type(params->column_return_type),
_filter_type(params->filter_type),
_filter_id(params->filter_id) {}
// for a 'tmp' runtime predicate wrapper
// only could called assign method or as a param for merge
RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, PrimitiveType column_type,
RuntimeFilterType type, uint32_t filter_id)
: _query_ctx(query_ctx),
_be_exec_version(_query_ctx->be_exec_version()),
_pool(pool),
_column_return_type(column_type),
_filter_type(type),
_filter_id(filter_id) {}
// init runtime filter wrapper
// alloc memory to init runtime filter function
Status init(const RuntimeFilterParams* params) {
Expand Down Expand Up @@ -946,7 +928,6 @@ class RuntimePredicateWrapper {

private:
RuntimeFilterParamsContext* _state;
QueryContext* _query_ctx;
int _be_exec_version;
ObjectPool* _pool;

Expand All @@ -971,15 +952,6 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* poo
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
}

Status IRuntimeFilter::create(QueryContext* query_ctx, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly) {
*res = pool->add(new IRuntimeFilter(query_ctx, pool, desc));
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
}

void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) {
context = _wrapper->_context;
}
Expand Down Expand Up @@ -1036,10 +1008,8 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr

bool IRuntimeFilter::await() {
DCHECK(is_consumer());
auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000
: _state->execution_timeout * 1000;
auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms()
: _state->runtime_filter_wait_time_ms;
auto execution_timeout = _state->execution_timeout * 1000;
auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
Expand Down Expand Up @@ -1215,11 +1185,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
_probe_expr = iter->second;
}

if (_state) {
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));
} else {
_wrapper = _pool->add(new RuntimePredicateWrapper(_query_ctx, _pool, &params));
}
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));

return _wrapper->init(&params);
}

Expand Down Expand Up @@ -1247,15 +1214,15 @@ Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
return _create_wrapper(state, param, pool, wrapper);
}

Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx,
Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
int filter_type = param->request->filter_type();
PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
if (param->request->has_in_filter()) {
column_type = to_primitive_type(param->request->in_filter().column_type());
}
wrapper->reset(new RuntimePredicateWrapper(query_ctx, pool, column_type, get_type(filter_type),
wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, get_type(filter_type),
param->request->filter_id()));

switch (filter_type) {
Expand Down Expand Up @@ -1683,7 +1650,7 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
}

std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_query_ctx, param, _pool, &tmp_wrapper));
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, &tmp_wrapper));
auto origin_type = _wrapper->get_real_type();
RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get()));
if (origin_type != _wrapper->get_real_type()) {
Expand Down
33 changes: 3 additions & 30 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,39 +208,13 @@ class IRuntimeFilter {
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}

IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc)
: _query_ctx(query_ctx),
_pool(pool),
_filter_id(desc->filter_id),
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
_rf_state(RuntimeFilterState::NOT_READY),
_rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
_wait_infinitely(query_ctx->runtime_filter_wait_infinitely()),
_rf_wait_time_ms(query_ctx->runtime_filter_wait_time_ms()),
_enable_pipeline_exec(query_ctx->enable_pipeline_exec()),
_runtime_filter_type(get_runtime_filter_type(desc)),
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}

~IRuntimeFilter() = default;

static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly = false);

static Status create(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, IRuntimeFilter** res, bool build_bf_exactly = false);

void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);

Expand Down Expand Up @@ -307,8 +281,8 @@ class IRuntimeFilter {
static Status create_wrapper(RuntimeFilterParamsContext* state,
const UpdateRuntimeFilterParams* param, ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper);
static Status create_wrapper(QueryContext* query_ctx, const UpdateRuntimeFilterParamsV2* param,
ObjectPool* pool,
static Status create_wrapper(RuntimeFilterParamsContext* state,
const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper);
void change_to_bloom_filter();
Status init_bloom_filter(const size_t build_bf_cardinality);
Expand Down Expand Up @@ -370,7 +344,7 @@ class IRuntimeFilter {
int32_t wait_time_ms() const {
int32_t res = 0;
if (wait_infinitely()) {
res = _state == nullptr ? _query_ctx->execution_timeout() : _state->execution_timeout;
res = _state->execution_timeout;
// Convert to ms
res *= 1000;
} else {
Expand Down Expand Up @@ -424,7 +398,6 @@ class IRuntimeFilter {
}

RuntimeFilterParamsContext* _state = nullptr;
QueryContext* _query_ctx = nullptr;
ObjectPool* _pool = nullptr;
// _wrapper is a runtime filter function wrapper
// _wrapper should alloc from _pool
Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state,
pquery_id->set_lo(_state->query_id.lo());

auto pfragment_instance_id = _rpc_context->request.mutable_fragment_instance_id();
pfragment_instance_id->set_hi(state->fragment_instance_id.hi());
pfragment_instance_id->set_lo(state->fragment_instance_id.lo());
pfragment_instance_id->set_hi(state->fragment_instance_id().hi());
pfragment_instance_id->set_lo(state->fragment_instance_id().lo());

_rpc_context->request.set_filter_id(_filter_id);
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
filterparams->query_id.set_hi(_runtime_state->query_id().hi);
filterparams->query_id.set_lo(_runtime_state->query_id().lo);

filterparams->fragment_instance_id.set_hi(fragment_instance_id.hi);
filterparams->fragment_instance_id.set_lo(fragment_instance_id.lo);
filterparams->_fragment_instance_id.set_hi(fragment_instance_id.hi);
filterparams->_fragment_instance_id.set_lo(fragment_instance_id.lo);
filterparams->be_exec_version = _runtime_state->be_exec_version();
filterparams->query_ctx = _query_ctx.get();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
timeout_second(-1),
_query_id(query_id),
_exec_env(exec_env),
_runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
_query_options(query_options) {
_start_time = VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
_execution_dependency.reset(new pipeline::Dependency(-1, -1, "ExecutionDependency", this));
_runtime_filter_mgr.reset(
new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this)));
}

QueryContext::~QueryContext() {
Expand Down
44 changes: 39 additions & 5 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParams
_state->runtime_filter_mgr = this;
}

RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx)
: _query_ctx(query_ctx) {}

Status RuntimeFilterMgr::init() {
_tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
ExecEnv::GetInstance()->experimental_mem_tracker());
Expand Down Expand Up @@ -120,7 +117,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets &&
desc.type == TRuntimeFilterType::BLOOM) {
// if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances
DCHECK(_query_ctx != nullptr);

iter = _consumer_map.find(key);
if (iter != _consumer_map.end()) {
Expand All @@ -131,7 +127,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
}
}
IRuntimeFilter* filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, &options,
RETURN_IF_ERROR(IRuntimeFilter::create(_state, _state->obj_pool(), &desc, &options,
RuntimeFilterRole::CONSUMER, node_id, &filter,
build_bf_exactly));
_consumer_map[key].emplace_back(node_id, filter);
Expand Down Expand Up @@ -569,4 +565,42 @@ void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller,
delete entity;
}

RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) {
RuntimeFilterParamsContext* params = state->obj_pool()->add(new RuntimeFilterParamsContext());
params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely();
params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
params->enable_pipeline_exec = state->enable_pipeline_exec();
params->execution_timeout = state->execution_timeout();
params->runtime_filter_mgr = state->runtime_filter_mgr();
params->exec_env = state->exec_env();
params->query_id.set_hi(state->query_id().hi);
params->query_id.set_lo(state->query_id().lo);

params->_fragment_instance_id.set_hi(state->fragment_instance_id().hi);
params->_fragment_instance_id.set_lo(state->fragment_instance_id().lo);
params->be_exec_version = state->be_exec_version();
params->query_ctx = state->get_query_ctx();
return params;
}

RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(QueryContext* query_ctx) {
RuntimeFilterParamsContext* params = query_ctx->obj_pool.add(new RuntimeFilterParamsContext());
params->runtime_filter_wait_infinitely = query_ctx->runtime_filter_wait_infinitely();
params->runtime_filter_wait_time_ms = query_ctx->runtime_filter_wait_time_ms();
params->enable_pipeline_exec = query_ctx->enable_pipeline_exec();
params->execution_timeout = query_ctx->execution_timeout();
params->runtime_filter_mgr = query_ctx->runtime_filter_mgr();
params->exec_env = query_ctx->exec_env();
params->query_id.set_hi(query_ctx->query_id().hi);
params->query_id.set_lo(query_ctx->query_id().lo);

// params->fragment_instance_id.set_hi(state->fragment_instance_id().hi);
// params->fragment_instance_id.set_lo(state->fragment_instance_id().lo);
params->be_exec_version = query_ctx->be_exec_version();
params->query_ctx = query_ctx;
params->_obj_pool = &query_ctx->obj_pool;
params->_is_global = true;
return params;
}

} // namespace doris
38 changes: 35 additions & 3 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enum class RuntimeFilterRole;
class RuntimePredicateWrapper;
class QueryContext;
struct RuntimeFilterParamsContext;
class ExecEnv;

/// producer:
/// Filter filter;
Expand All @@ -68,8 +69,6 @@ class RuntimeFilterMgr {
public:
RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state);

RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);

~RuntimeFilterMgr() = default;

Status init();
Expand Down Expand Up @@ -109,7 +108,6 @@ class RuntimeFilterMgr {
std::map<int32_t, IRuntimeFilter*> _producer_map;

RuntimeFilterParamsContext* _state = nullptr;
QueryContext* _query_ctx = nullptr;
std::unique_ptr<MemTracker> _tracker;
ObjectPool _pool;

Expand Down Expand Up @@ -227,4 +225,38 @@ using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMerge
void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller,
RuntimeFilterMergeControllerEntity* entity);

//There are two types of runtime filters:
// one is global, originating from QueryContext,
// and the other is local, originating from RuntimeState.
// In practice, we have already distinguished between them through UpdateRuntimeFilterParamsV2/V1.
// RuntimeState/QueryContext is only used to store runtime_filter_wait_time_ms and enable_pipeline_exec...

/// TODO: Consider adding checks for global/local.
struct RuntimeFilterParamsContext {
RuntimeFilterParamsContext() = default;
static RuntimeFilterParamsContext* create(RuntimeState* state);
static RuntimeFilterParamsContext* create(QueryContext* query_ctx);

bool runtime_filter_wait_infinitely;
int32_t runtime_filter_wait_time_ms;
bool enable_pipeline_exec;
int32_t execution_timeout;
RuntimeFilterMgr* runtime_filter_mgr;
ExecEnv* exec_env;
PUniqueId query_id;
PUniqueId _fragment_instance_id;
int be_exec_version;
QueryContext* query_ctx;
QueryContext* get_query_ctx() const { return query_ctx; }
ObjectPool* _obj_pool;
bool _is_global = false;
PUniqueId fragment_instance_id() const {
DCHECK(!_is_global);
return _fragment_instance_id;
}
ObjectPool* obj_pool() const {
DCHECK(_is_global);
return _obj_pool;
}
};
} // namespace doris
17 changes: 0 additions & 17 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,21 +509,4 @@ bool RuntimeState::enable_page_cache() const {
(_query_options.__isset.enable_page_cache && _query_options.enable_page_cache);
}

RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) {
RuntimeFilterParamsContext* params = state->obj_pool()->add(new RuntimeFilterParamsContext());
params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely();
params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
params->enable_pipeline_exec = state->enable_pipeline_exec();
params->execution_timeout = state->execution_timeout();
params->runtime_filter_mgr = state->runtime_filter_mgr();
params->exec_env = state->exec_env();
params->query_id.set_hi(state->query_id().hi);
params->query_id.set_lo(state->query_id().lo);

params->fragment_instance_id.set_hi(state->fragment_instance_id().hi);
params->fragment_instance_id.set_lo(state->fragment_instance_id().lo);
params->be_exec_version = state->be_exec_version();
params->query_ctx = state->get_query_ctx();
return params;
}
} // end namespace doris
18 changes: 0 additions & 18 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -662,24 +662,6 @@ class RuntimeState {
RuntimeState(const RuntimeState&);
};

// from runtime state
struct RuntimeFilterParamsContext {
RuntimeFilterParamsContext() = default;
static RuntimeFilterParamsContext* create(RuntimeState* state);

bool runtime_filter_wait_infinitely;
int32_t runtime_filter_wait_time_ms;
bool enable_pipeline_exec;
int32_t execution_timeout;
RuntimeFilterMgr* runtime_filter_mgr;
ExecEnv* exec_env;
PUniqueId query_id;
PUniqueId fragment_instance_id;
int be_exec_version;
QueryContext* query_ctx;
QueryContext* get_query_ctx() const { return query_ctx; }
};

#define RETURN_IF_CANCELLED(state) \
do { \
if (UNLIKELY((state)->is_cancelled())) return Status::Cancelled("Cancelled"); \
Expand Down