Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ set(RUNTIME_FILES
mysql_result_writer.cpp
memory/system_allocator.cpp
memory/chunk_allocator.cpp
fold_constant_mgr.cpp
cache/result_node.cpp
cache/result_cache.cpp
odbc_table_sink.cpp
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class RoutineLoadTaskExecutor;
class SmallFileMgr;
class FileBlockManager;
class PluginMgr;
class FoldConstantMgr;

class BackendServiceClient;
class FrontendServiceClient;
Expand Down Expand Up @@ -128,6 +129,7 @@ class ExecEnv {
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
FoldConstantMgr* fold_constant_mgr() { return _fold_constant_mgr; }

const std::vector<StorePath>& store_paths() const { return _store_paths; }
void set_store_paths(const std::vector<StorePath>& paths) { _store_paths = paths; }
Expand Down Expand Up @@ -179,6 +181,7 @@ class ExecEnv {
LoadPathMgr* _load_path_mgr = nullptr;
DiskIoMgr* _disk_io_mgr = nullptr;
TmpFileMgr* _tmp_file_mgr = nullptr;
FoldConstantMgr* _fold_constant_mgr = nullptr;

BfdParser* _bfd_parser = nullptr;
BrokerMgr* _broker_mgr = nullptr;
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "runtime/etl_job_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/external_scan_context_mgr.h"
#include "runtime/fold_constant_mgr.h"
#include "runtime/fragment_mgr.h"
#include "runtime/heartbeat_flags.h"
#include "runtime/load_channel_mgr.h"
Expand Down Expand Up @@ -98,6 +99,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
config::query_cache_elasticity_size_mb);
_fold_constant_mgr = new FoldConstantMgr(this);
_master_info = new TMasterInfo();
_etl_job_mgr = new EtlJobMgr(this);
_load_path_mgr = new LoadPathMgr(this);
Expand Down Expand Up @@ -250,6 +252,7 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_etl_job_mgr);
SAFE_DELETE(_master_info);
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_fold_constant_mgr);
SAFE_DELETE(_cgroups_mgr);
SAFE_DELETE(_etl_thread_pool);
SAFE_DELETE(_thread_pool);
Expand Down
213 changes: 213 additions & 0 deletions be/src/runtime/fold_constant_mgr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <map>
#include <string>

#include "runtime/fold_constant_mgr.h"
#include "runtime/tuple_row.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/mem_tracker.h"
#include "exprs/expr_context.h"
#include "exprs/expr.h"
#include "common/object_pool.h"
#include "common/status.h"

#include "gen_cpp/internal_service.pb.h"
#include "gen_cpp/PaloInternalService_types.h"

using std::string;
using std::map;

namespace doris {

TUniqueId FoldConstantMgr::_dummy_id;

FoldConstantMgr::FoldConstantMgr(ExecEnv* exec_env)
: _exec_env(exec_env), _pool(){

}

Status FoldConstantMgr::fold_constant_expr(
const TFoldConstantParams& params, PConstantExprResult* response) {
auto expr_map = params.expr_map;
auto expr_result_map = response->mutable_expr_result_map();

TQueryGlobals query_globals = params.query_globals;

// init
Status status = init(query_globals);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg();
return status;
}

for (auto m : expr_map) {
PExprResultMap pexpr_result_map;
for (auto n : m.second) {
ExprContext* ctx = nullptr;
TExpr& texpr = n.second;
// create expr tree from TExpr
RETURN_IF_ERROR(Expr::create_expr_tree(&_pool, texpr, &ctx));
// prepare and open context
status = prepare_and_open(ctx);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg();
return status;
}

TupleRow* row = nullptr;
// calc expr
void* src = ctx->get_value(row);
PrimitiveType root_type = ctx->root()->type().type;
// covert to thrift type
TPrimitiveType::type t_type = doris::to_thrift(root_type);

// collect result
PExprResult expr_result;
string result;
if (src == nullptr) {
expr_result.set_success(false);
} else {
expr_result.set_success(true);
result = get_result(src, ctx->root()->type().type);
}

expr_result.set_content(result);
expr_result.mutable_type()->set_type(t_type);

pexpr_result_map.mutable_map()->insert({n.first, expr_result});

// close context expr
ctx->close(_runtime_state.get());
}

expr_result_map->insert({m.first, pexpr_result_map});
}

return Status::OK();

}

Status FoldConstantMgr::init(TQueryGlobals query_globals) {
// init runtime state, runtime profile
TPlanFragmentExecParams params;
params.fragment_instance_id = FoldConstantMgr::_dummy_id;
params.query_id = FoldConstantMgr::_dummy_id;
TExecPlanFragmentParams fragment_params;
fragment_params.params = params;
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
TQueryOptions query_options;
_runtime_state.reset(new RuntimeState(fragment_params.params, query_options, query_globals,
ExecEnv::GetInstance()));
DescriptorTbl* desc_tbl = NULL;
TDescriptorTable* t_desc_tbl = new TDescriptorTable();
Status status = DescriptorTbl::create(_runtime_state->obj_pool(), *t_desc_tbl, &desc_tbl);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to create descriptor table, msg: " << status.get_error_msg();
return Status::Uninitialized(status.get_error_msg());
}
_runtime_state->set_desc_tbl(desc_tbl);
status = _runtime_state->init_mem_trackers(FoldConstantMgr::_dummy_id);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg();
return Status::Uninitialized(status.get_error_msg());
}

_runtime_profile = _runtime_state->runtime_profile();
_runtime_profile->set_name("FoldConstantExpr");
_mem_tracker = MemTracker::CreateTracker(-1, "FoldConstantExpr", _runtime_state->instance_mem_tracker());
_mem_pool.reset(new MemPool(_mem_tracker.get()));

return Status::OK();
}

Status FoldConstantMgr::prepare_and_open(ExprContext* ctx) {
RowDescriptor* desc = new RowDescriptor();
ctx -> prepare(_runtime_state.get(), *desc, _mem_tracker);
return ctx -> open(_runtime_state.get());
}

string FoldConstantMgr::get_result(void* src, PrimitiveType slot_type){
switch (slot_type) {
case TYPE_NULL: {
return NULL;
}
case TYPE_BOOLEAN: {
bool val = *reinterpret_cast<const bool*>(src);
return val ? "true" : "false";
}
case TYPE_TINYINT: {
int8_t val = *reinterpret_cast<const int8_t*>(src);
string s;
s.push_back(val);
return s;
}
case TYPE_SMALLINT: {
int16_t val = *reinterpret_cast<const int16_t*>(src);
return std::to_string(val);
}
case TYPE_INT: {
int32_t val = *reinterpret_cast<const int32_t*>(src);
return std::to_string(val);
}
case TYPE_BIGINT: {
int64_t val = *reinterpret_cast<const int64_t*>(src);
return std::to_string(val);
}
case TYPE_LARGEINT: {
char buf[48];
int len = 48;
char* v = LargeIntValue::to_string(*reinterpret_cast<__int128*>(src), buf, &len);
return std::string(v, len);
}
case TYPE_FLOAT: {
float val = *reinterpret_cast<const float*>(src);
return std::to_string(val);
}
case TYPE_TIME:
case TYPE_DOUBLE: {
double val = *reinterpret_cast<double*>(src);
return std::to_string(val);
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_HLL:
case TYPE_OBJECT: {
return (reinterpret_cast<StringValue*>(src))->debug_string();
}
case TYPE_DATE:
case TYPE_DATETIME: {
const DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(src);
char str[MAX_DTVALUE_STR_LEN];
date_value.to_string(str);
return str;
}
case TYPE_DECIMALV2: {
return reinterpret_cast<DecimalV2Value*>(src)->to_string();
}
default:
DCHECK(false) << "Type not implemented: " << slot_type;
return NULL;
}
return NULL;
}


}

57 changes: 57 additions & 0 deletions be/src/runtime/fold_constant_mgr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/tuple_row.h"
#include "util/runtime_profile.h"
#include "exprs/expr_context.h"
#include "exprs/expr.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/exec_env.h"
#include "gen_cpp/internal_service.pb.h"
#include "gen_cpp/PaloInternalService_types.h"

namespace doris {

class TFoldConstantParams;
class TExpr;
class TQueryGlobals;

// This class used to fold constant expr from fe
class FoldConstantMgr {
public:
FoldConstantMgr(ExecEnv* exec_env);
// fold constant expr
Status fold_constant_expr(const TFoldConstantParams& params, PConstantExprResult* response);
// init runtime_state and mem_tracker
Status init(TQueryGlobals query_globals);
// prepare expr
Status prepare_and_open(ExprContext* ctx);

std::string get_result(void* src, PrimitiveType slot_type);

private:
std::unique_ptr<RuntimeState> _runtime_state;
std::shared_ptr<MemTracker> _mem_tracker;
RuntimeProfile* _runtime_profile;
std::unique_ptr<MemPool> _mem_pool;
ExecEnv* _exec_env;
ObjectPool _pool;
static TUniqueId _dummy_id;
};
}

39 changes: 39 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include "runtime/buffer_control_block.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/fold_constant_mgr.h"
#include "runtime/fragment_mgr.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/routine_load/routine_load_task_executor.h"
#include "runtime/runtime_state.h"
#include "service/brpc.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -285,6 +287,43 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
}
st.to_protobuf(response->mutable_status());
}

template<typename T>
void PInternalServiceImpl<T>::fold_constant_expr(
google::protobuf::RpcController* cntl_base,
const PConstantExprRequest* request,
PConstantExprResult* response,
google::protobuf::Closure* done) {

brpc::ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);

Status st = Status::OK();
if (request->has_request()) {
st = _fold_constant_expr(request->request(), response);
} else {
// TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15
st = _fold_constant_expr(cntl->request_attachment().to_string(), response);
}
if (!st.ok()) {
LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st.get_error_msg();
}
st.to_protobuf(response->mutable_status());
}

template<typename T>
Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string& ser_request,
PConstantExprResult* response) {

TFoldConstantParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
}
FoldConstantMgr mgr(_exec_env);
return mgr.fold_constant_expr(t_request, response);
}
template class PInternalServiceImpl<PBackendService>;
template class PInternalServiceImpl<palo::PInternalService>;

Expand Down
Loading