diff --git a/.gitignore b/.gitignore index 34e503496f0a16..b9bce7bf8333e1 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ output docs/build gensrc/build fe/target +thirdparty/src diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index c8d41103cdbd81..ec40b7909e5139 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -18,15 +18,17 @@ #include "agent/heartbeat_server.h" #include #include -#include "boost/filesystem.hpp" -#include "thrift/TProcessor.h" -#include "gen_cpp/HeartbeatService.h" -#include "gen_cpp/Status_types.h" + +#include +#include #include "common/status.h" +#include "gen_cpp/HeartbeatService.h" +#include "gen_cpp/Status_types.h" #include "olap/olap_engine.h" #include "olap/utils.h" #include "service/backend_options.h" +#include "util/thrift_server.h" using std::fstream; using std::nothrow; diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h index dc78db392dee12..1a36187ca4c0c3 100644 --- a/be/src/agent/heartbeat_server.h +++ b/be/src/agent/heartbeat_server.h @@ -31,6 +31,7 @@ namespace doris { const uint32_t HEARTBEAT_INTERVAL = 10; class OLAPEngine; class Status; +class ThriftServer; class HeartbeatServer : public HeartbeatServiceIf { public: diff --git a/be/src/agent/user_resource_listener.cpp b/be/src/agent/user_resource_listener.cpp index 631cd7c795e940..cd563f4570e10a 100644 --- a/be/src/agent/user_resource_listener.cpp +++ b/be/src/agent/user_resource_listener.cpp @@ -25,6 +25,7 @@ #include #include "common/logging.h" #include "gen_cpp/FrontendService.h" +#include "runtime/client_cache.h" namespace doris { @@ -38,7 +39,7 @@ using apache::thrift::transport::TTransportException; UserResourceListener::UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info) : _master_info(master_info), - _master_client_cache(exec_env->frontend_client_cache()), + _exec_env(exec_env), _cgroups_mgr(*(exec_env->cgroups_mgr())) { } @@ -64,7 +65,7 @@ void UserResourceListener::update_users_resource(int64_t new_version) { // Call fe to get latest user resource Status master_status; // using 500ms as default timeout value - FrontendServiceConnection client(_master_client_cache, + FrontendServiceConnection client(_exec_env->frontend_client_cache(), _master_info.network_address, 500, &master_status); diff --git a/be/src/agent/user_resource_listener.h b/be/src/agent/user_resource_listener.h index 16736d973a38f9..2fb6d9713949c5 100644 --- a/be/src/agent/user_resource_listener.h +++ b/be/src/agent/user_resource_listener.h @@ -28,6 +28,8 @@ namespace doris { +class ExecEnv; + class UserResourceListener : public TopicListener { public: @@ -40,7 +42,7 @@ class UserResourceListener : public TopicListener { const TTopicUpdate& topic_update); private: const TMasterInfo& _master_info; - FrontendServiceClientCache* _master_client_cache; + ExecEnv* _exec_env; CgroupsMgr& _cgroups_mgr; // Call cgroups mgr to update user's cgroups resource share // Also refresh local user resource's cache diff --git a/be/src/common/config.h b/be/src/common/config.h index 9288635f10b74a..84eae915195c0f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -337,42 +337,36 @@ namespace config { // cpu count CONF_Int32(flags_num_cores, "32"); - CONF_Bool(FLAGS_thread_creation_fault_injection, "false"); + CONF_Bool(thread_creation_fault_injection, "false"); // Set this to encrypt and perform an integrity // check on all data spilled to disk during a query - CONF_Bool(FLAGS_disk_spill_encryption, "false"); + CONF_Bool(disk_spill_encryption, "false"); // Writable scratch directories - CONF_String(FLAGS_scratch_dirs, "/tmp"); + CONF_String(scratch_dirs, "/tmp"); // If false and --scratch_dirs contains multiple directories on the same device, // then only the first writable directory is used - CONF_Bool(FLAGS_allow_multiple_scratch_dirs_per_device, "false"); + CONF_Bool(allow_multiple_scratch_dirs_per_device, "false"); // linux transparent huge page - CONF_Bool(FLAGS_madvise_huge_pages, "false"); + CONF_Bool(madvise_huge_pages, "false"); // whether use mmap to allocate memory - CONF_Bool(FLAGS_mmap_buffers, "false"); - - // whether or not user mem pool - CONF_Bool(FLAGS_disable_mem_pools, "false"); + CONF_Bool(mmap_buffers, "false"); // max memory can be allocated by buffer pool - CONF_String(FLAGS_buffer_pool_limit, "80G"); + CONF_String(buffer_pool_limit, "80G"); // clean page can be hold by buffer pool - CONF_String(FLAGS_buffer_pool_clean_pages_limit, "20G"); - - // buffer pool can support min memory allocated - CONF_Int32(FLAGS_min_buffer_size, "1024"); + CONF_String(buffer_pool_clean_pages_limit, "20G"); // Sleep time in seconds between memory maintenance iterations - CONF_Int64(FLAGS_memory_maintenance_sleep_time_s, "10"); + CONF_Int64(memory_maintenance_sleep_time_s, "10"); // Aligement - CONF_Int32(FLAGS_MEMORY_MAX_ALIGNMENT, "16"); + CONF_Int32(memory_max_alignment, "16"); // write buffer size before flush CONF_Int32(write_buffer_size, "104857600"); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index fa5aaa6b4b0f28..53a327bb54b5e6 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -81,7 +81,7 @@ void* tcmalloc_gc_thread(void* dummy) { void* memory_maintenance_thread(void* dummy) { while (true) { - sleep(config::FLAGS_memory_maintenance_sleep_time_s); + sleep(config::memory_maintenance_sleep_time_s); ExecEnv* env = ExecEnv::GetInstance(); // ExecEnv may not have been created yet or this may be the catalogd or statestored, // which don't have ExecEnvs. diff --git a/be/src/common/status.cpp b/be/src/common/status.cpp index 33bf2a5ff70798..c66a108cfed537 100644 --- a/be/src/common/status.cpp +++ b/be/src/common/status.cpp @@ -20,7 +20,6 @@ #include #include "common/logging.h" -#include "util/debug_util.h" namespace doris { diff --git a/be/src/common/status.h b/be/src/common/status.h index ff8c1628e03962..040baf8c5ef207 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -25,6 +25,7 @@ #include "common/compiler_util.h" #include "gen_cpp/Status_types.h" // for TStatus #include "gen_cpp/status.pb.h" // for PStatus +#include "util/stack_util.h" // for PStatus namespace doris { diff --git a/be/src/exec/aggregation_node.cpp b/be/src/exec/aggregation_node.cpp index 214a41712ebdc7..1a0e96ddb11448 100644 --- a/be/src/exec/aggregation_node.cpp +++ b/be/src/exec/aggregation_node.cpp @@ -40,7 +40,6 @@ #include "runtime/string_value.hpp" #include "runtime/tuple.h" #include "runtime/tuple_row.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" using llvm::BasicBlock; @@ -207,7 +206,7 @@ Status AggregationNode::open(RuntimeState* state) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.get_row(i); VLOG_ROW << "id=" << id() << " input row: " - << print_row(row, _children[0]->row_desc()); + << row->to_string(_children[0]->row_desc()); } } @@ -291,7 +290,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* row->set_tuple(0, output_tuple); if (ExecNode::eval_conjuncts(ctxs, num_ctxs, row)) { - VLOG_ROW << "output row: " << print_row(row, row_desc()); + VLOG_ROW << "output row: " << row->to_string(row_desc()); row_batch->commit_last_row(); ++_num_rows_returned; diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index 1b406b05f28bd4..af305d42dd1790 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -22,7 +22,6 @@ #include "exprs/expr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/PlanNodes_types.h" @@ -191,9 +190,9 @@ std::string BlockingJoinNode::get_left_child_row_string(TupleRow* row) { std::find(_build_tuple_idx_ptr, _build_tuple_idx_ptr + _build_tuple_size, i); if (is_build_tuple != _build_tuple_idx_ptr + _build_tuple_size) { - out << print_tuple(NULL, *row_desc().tuple_descriptors()[i]); + out << Tuple::to_string(NULL, *row_desc().tuple_descriptors()[i]); } else { - out << print_tuple(row->get_tuple(i), *row_desc().tuple_descriptors()[i]); + out << Tuple::to_string(row->get_tuple(i), *row_desc().tuple_descriptors()[i]); } } diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index d6373c4609e82b..cd0b3b746ec210 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -26,7 +26,6 @@ #include "runtime/dpp_sink_internal.h" #include "exec/broker_scanner.h" #include "exprs/expr.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" namespace doris { @@ -213,7 +212,7 @@ Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* for (int i = 0; i < row_batch->num_rows(); ++i) { TupleRow* row = row_batch->get_row(i); VLOG_ROW << "BrokerScanNode output row: " - << print_tuple(row->get_tuple(0), *_tuple_desc); + << Tuple::to_string(row->get_tuple(0), *_tuple_desc); } } @@ -331,7 +330,7 @@ Status BrokerScanNode::scanner_scan( std::stringstream error_msg; error_msg << "No corresponding partition, partition id: " << partition_id; - _runtime_state->append_error_msg_to_file(print_tuple(tuple, *_tuple_desc), + _runtime_state->append_error_msg_to_file(Tuple::to_string(tuple, *_tuple_desc), error_msg.str()); continue; } diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index e68e3e2ed7cce2..840c875f9d6695 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -24,7 +24,6 @@ #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/row_batch.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/PlanNodes_types.h" @@ -172,7 +171,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* } if (VLOG_ROW_IS_ON) { - VLOG_ROW << "ExchangeNode output batch: " << print_batch(output_batch); + VLOG_ROW << "ExchangeNode output batch: " << output_batch->to_string(); } COUNTER_SET(_rows_returned_counter, _num_rows_returned); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 8f3e682b43b7f0..378f8f5a0a727e 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -26,7 +26,6 @@ #include "exprs/slot_ref.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/PlanNodes_types.h" @@ -444,7 +443,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo VLOG_ROW << "probe row: " << get_probe_row_output_string(_current_probe_row); while (_hash_tbl_iterator.has_next()) { TupleRow* matched_build_row = _hash_tbl_iterator.get_row(); - VLOG_ROW << "matched_build_row: " << print_row(matched_build_row, child(1)->row_desc()); + VLOG_ROW << "matched_build_row: " << matched_build_row->to_string(child(1)->row_desc()); if ((_join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN) && _hash_tbl_iterator.matched()) { @@ -508,7 +507,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo _hash_tbl_iterator.next(); if (eval_conjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { out_batch->commit_last_row(); - VLOG_ROW << "match row: " << print_row(out_row, row_desc()); + VLOG_ROW << "match row: " << out_row->to_string(row_desc()); ++_num_rows_returned; COUNTER_SET(_rows_returned_counter, _num_rows_returned); @@ -528,7 +527,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo if (eval_conjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { out_batch->commit_last_row(); - VLOG_ROW << "match row: " << print_row(out_row, row_desc()); + VLOG_ROW << "match row: " << out_row->to_string(row_desc()); ++_num_rows_returned; COUNTER_SET(_rows_returned_counter, _num_rows_returned); _matched_probe = true; @@ -628,7 +627,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo create_output_row(out_row, NULL, build_row); if (eval_conjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { out_batch->commit_last_row(); - VLOG_ROW << "match row: " << print_row(out_row, row_desc()); + VLOG_ROW << "match row: " << out_row->to_string(row_desc()); ++_num_rows_returned; COUNTER_SET(_rows_returned_counter, _num_rows_returned); @@ -718,9 +717,9 @@ string HashJoinNode::get_probe_row_output_string(TupleRow* probe_row) { std::find(_build_tuple_idx_ptr, _build_tuple_idx_ptr + _build_tuple_size, i); if (is_build_tuple != _build_tuple_idx_ptr + _build_tuple_size) { - out << print_tuple(NULL, *row_desc().tuple_descriptors()[i]); + out << Tuple::to_string(NULL, *row_desc().tuple_descriptors()[i]); } else { - out << print_tuple(probe_row->get_tuple(i), *row_desc().tuple_descriptors()[i]); + out << Tuple::to_string(probe_row->get_tuple(i), *row_desc().tuple_descriptors()[i]); } } diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp index 692ef848bfb88c..1625febdeb7eac 100644 --- a/be/src/exec/hash_table.cpp +++ b/be/src/exec/hash_table.cpp @@ -25,7 +25,6 @@ #include "runtime/string_value.hpp" #include "runtime/mem_tracker.h" #include "runtime/runtime_state.h" -#include "util/debug_util.h" #include "util/doris_metrics.h" using llvm::BasicBlock; @@ -313,7 +312,7 @@ std::string HashTable::debug_string(bool skip_empty, const RowDescriptor* desc) if (desc == NULL) { ss << node_idx << "(" << (void*)node->data() << ")"; } else { - ss << (void*)node->data() << " " << print_row(node->data(), *desc); + ss << (void*)node->data() << " " << node->data()->to_string(*desc); } node_idx = node->_next_idx; diff --git a/be/src/exec/new_partitioned_aggregation_node.cc b/be/src/exec/new_partitioned_aggregation_node.cc index 8f4736e82d41ba..da063f92e621cf 100644 --- a/be/src/exec/new_partitioned_aggregation_node.cc +++ b/be/src/exec/new_partitioned_aggregation_node.cc @@ -44,7 +44,6 @@ #include "runtime/tuple_row.h" #include "runtime/tuple.h" #include "udf/udf_internal.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/Exprs_types.h" @@ -313,7 +312,7 @@ Status NewPartitionedAggregationNode::open(RuntimeState* state) { if (UNLIKELY(VLOG_ROW_IS_ON)) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.get_row(i); - VLOG_ROW << "input row: " << print_row(row, _children[0]->row_desc()); + VLOG_ROW << "input row: " << row->to_string(_children[0]->row_desc()); } } diff --git a/be/src/exec/new_partitioned_hash_table.cc b/be/src/exec/new_partitioned_hash_table.cc index 56b06c34b7970d..948cefa732fbd8 100644 --- a/be/src/exec/new_partitioned_hash_table.cc +++ b/be/src/exec/new_partitioned_hash_table.cc @@ -31,7 +31,6 @@ #include "runtime/raw_value.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" -#include "util/debug_util.h" #include "util/doris_metrics.h" #include "common/names.h" @@ -566,7 +565,7 @@ void NewPartitionedHashTable::DebugStringTuple(std::stringstream& ss, HtData& ht } if (desc != NULL) { Tuple* row[num_build_tuples_]; - ss << " " << print_row(GetRow(htdata, reinterpret_cast(row)), *desc); + ss << " " << GetRow(htdata, reinterpret_cast(row))->to_string(*desc); } } diff --git a/be/src/exec/olap_rewrite_node.cpp b/be/src/exec/olap_rewrite_node.cpp index 44317ec4e758a2..611edc4979c753 100644 --- a/be/src/exec/olap_rewrite_node.cpp +++ b/be/src/exec/olap_rewrite_node.cpp @@ -25,7 +25,6 @@ #include "runtime/row_batch.h" #include "runtime/raw_value.h" #include "runtime/tuple.h" -#include "util/debug_util.h" namespace doris { @@ -232,7 +231,7 @@ bool OlapRewriteNode::copy_rows(RuntimeState* state, RowBatch* output_batch) { if (VLOG_ROW_IS_ON) { for (int i = 0; i < output_batch->num_rows(); ++i) { TupleRow* row = output_batch->get_row(i); - VLOG_ROW << "OlapRewriteNode input row: " << print_row(row, row_desc()); + VLOG_ROW << "OlapRewriteNode input row: " << row->to_string(row_desc()); } } diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 80cb4e8dbffdf5..cc1897e9ca8808 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -38,6 +38,7 @@ #include "util/runtime_profile.h" #include "util/thread_pool.hpp" #include "util/debug_util.h" +#include "util/priority_thread_pool.hpp" #include "agent/cgroups_mgr.h" #include "common/resource_tls.h" #include @@ -289,7 +290,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo for (int i = 0; i < row_batch->num_rows(); ++i) { TupleRow* row = row_batch->get_row(i); VLOG_ROW << "OlapScanNode output row: " - << print_tuple(row->get_tuple(0), *_tuple_desc); + << Tuple::to_string(row->get_tuple(0), *_tuple_desc); } } __sync_fetch_and_sub(&_buffered_bytes, diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index ef5f2668125700..0db834d0a013aa 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -35,7 +35,6 @@ #include "runtime/row_batch_interface.hpp" #include "runtime/vectorized_row_batch.h" #include "util/progress_updater.h" -#include "util/debug_util.h" namespace doris { @@ -125,7 +124,7 @@ class OlapScanNode : public ScanNode { while (!h.empty()) { HeapType v = h.top(); - s << "\nID: " << v.id << " Value:" << print_tuple(v.tuple, *_tuple_desc); + s << "\nID: " << v.id << " Value:" << Tuple::to_string(v.tuple, *_tuple_desc); h.pop(); } diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index dda2fefdca5279..31b92f9b4e7009 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -260,7 +260,7 @@ Status OlapScanner::get_batch( _convert_row_to_tuple(tuple); if (VLOG_ROW_IS_ON) { - VLOG_ROW << "OlapScanner input row: " << print_tuple(tuple, *_tuple_desc); + VLOG_ROW << "OlapScanner input row: " << Tuple::to_string(tuple, *_tuple_desc); } // 3.4 Set tuple to RowBatch(not commited) @@ -312,7 +312,7 @@ Status OlapScanner::get_batch( } } if (VLOG_ROW_IS_ON) { - VLOG_ROW << "OlapScanner output row: " << print_tuple(tuple, *_tuple_desc); + VLOG_ROW << "OlapScanner output row: " << Tuple::to_string(tuple, *_tuple_desc); } // check direct && pushdown conjuncts success then commit tuple diff --git a/be/src/exec/olap_table_info.cpp b/be/src/exec/olap_table_info.cpp index cc94b54fb3ff52..04e8f9f8f0820d 100644 --- a/be/src/exec/olap_table_info.cpp +++ b/be/src/exec/olap_table_info.cpp @@ -21,7 +21,6 @@ #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" -#include "util/debug_util.h" #include "util/string_parser.hpp" namespace doris { @@ -124,8 +123,8 @@ std::string OlapTableSchemaParam::debug_string() const { std::string OlapTablePartition::debug_string(TupleDescriptor* tuple_desc) const { std::stringstream ss; ss << "(id=" << id - << ",start_key=" << print_tuple(start_key, *tuple_desc) - << ",end_key=" << print_tuple(end_key, *tuple_desc) + << ",start_key=" << Tuple::to_string(start_key, *tuple_desc) + << ",end_key=" << Tuple::to_string(end_key, *tuple_desc) << ",num_buckets=" << num_buckets << ",indexes=["; int idx = 0; diff --git a/be/src/exec/olap_table_sink.cpp b/be/src/exec/olap_table_sink.cpp index da6c94cb6a2fe2..5bf3fd93fc54cd 100644 --- a/be/src/exec/olap_table_sink.cpp +++ b/be/src/exec/olap_table_sink.cpp @@ -24,7 +24,6 @@ #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple_row.h" -#include "util/debug_util.h" #include "util/brpc_stub_cache.h" #include "util/uid_util.h" @@ -554,7 +553,8 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { uint32_t dist_hash = 0; if (!_partition->find_tablet(tuple, &partition, &dist_hash)) { std::stringstream ss; - ss << "no partition for this tuple. tuple=" << print_tuple(tuple, *_output_tuple_desc); + ss << "no partition for this tuple. tuple=" + << Tuple::to_string(tuple, *_output_tuple_desc); #if BE_TEST LOG(INFO) << ss.str(); #else diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index 506179aeecf76e..dfddb3b85b38bc 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -37,7 +37,6 @@ #include "runtime/tuple.h" #include "runtime/tuple_row.h" #include "udf/udf_internal.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/Exprs_types.h" @@ -240,7 +239,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.get_row(i); VLOG_ROW << "partition-agg-node input row: " - << print_row(row, _children[0]->row_desc()); + << row->to_string(_children[0]->row_desc()); } } @@ -250,7 +249,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { } else if (_probe_expr_ctxs.empty()) { RETURN_IF_ERROR(process_batch_no_grouping(&batch)); } else { - // VLOG_ROW << "partition-agg-node batch: " << print_batch(&batch); + // VLOG_ROW << "partition-agg-node batch: " << batch->to_string(); // There is grouping, so we will do partitioned aggregation. RETURN_IF_ERROR(process_batch(&batch, _ht_ctx.get())); } diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc index 8798bc9738d22d..e14bdd6b09b573 100644 --- a/be/src/exec/partitioned_hash_table.cc +++ b/be/src/exec/partitioned_hash_table.cc @@ -27,7 +27,6 @@ #include "runtime/raw_value.h" #include "runtime/runtime_state.h" #include "runtime/string_value.hpp" -#include "util/debug_util.h" #include "util/doris_metrics.h" // using namespace llvm; @@ -391,7 +390,7 @@ void PartitionedHashTable::debug_string_tuple( } if (desc != NULL) { Tuple* row[_num_build_tuples]; - ss << " " << print_row(get_row(htdata, reinterpret_cast(row)), *desc); + ss << " " << get_row(htdata, reinterpret_cast(row))->to_string(*desc); } } diff --git a/be/src/exec/row_batch_list.h b/be/src/exec/row_batch_list.h index 50f5815b0eef1c..1fb2597e2c1c5b 100644 --- a/be/src/exec/row_batch_list.h +++ b/be/src/exec/row_batch_list.h @@ -23,7 +23,7 @@ #include "common/logging.h" #include "runtime/row_batch.h" -#include "util/debug_util.h" +#include "runtime/tuple_row.h" namespace doris { @@ -108,7 +108,7 @@ class RowBatchList { RowBatchList::TupleRowIterator it = iterator(); while (!it.at_end()) { - out << " " << print_row(it.get_row(), desc); + out << " " << it.get_row()->to_string(desc); it.next(); } diff --git a/be/src/exec/schema_scanner/frontend_helper.cpp b/be/src/exec/schema_scanner/frontend_helper.cpp index 5826b9f309fd5b..aa4c7693f8797e 100644 --- a/be/src/exec/schema_scanner/frontend_helper.cpp +++ b/be/src/exec/schema_scanner/frontend_helper.cpp @@ -29,6 +29,7 @@ #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/FrontendService.h" #include "runtime/runtime_state.h" +#include "runtime/exec_env.h" #include "runtime/row_batch.h" #include "runtime/string_value.h" #include "runtime/tuple_row.h" @@ -37,6 +38,7 @@ #include "util/network_util.h" #include "util/thrift_util.h" #include "util/runtime_profile.h" +#include "runtime/client_cache.h" namespace doris { diff --git a/be/src/exec/schema_scanner/frontend_helper.h b/be/src/exec/schema_scanner/frontend_helper.h index 5a4a4bcacf87f6..3f86bde2e87b11 100644 --- a/be/src/exec/schema_scanner/frontend_helper.h +++ b/be/src/exec/schema_scanner/frontend_helper.h @@ -19,11 +19,14 @@ #define DORIS_BE_SRC_QUERY_EXEC_SCHEMA_SCANNER_FRONTEND_HELPER_H #include "common/status.h" -#include "runtime/exec_env.h" #include "gen_cpp/FrontendService_types.h" namespace doris { +class ExecEnv; +class FrontendServiceClient; +template class ClientConnection; + // this class is a helper for jni call. easy for unit test class FrontendHelper { public: @@ -59,7 +62,7 @@ class FrontendHelper { static Status rpc( const std::string& ip, const int32_t port, - std::function callback, + std::function&)> callback, int timeout_ms = 5000); private: static ExecEnv* _s_exec_env; diff --git a/be/src/exec/select_node.cpp b/be/src/exec/select_node.cpp index ef634db00f37d6..b962e5e3d319f5 100644 --- a/be/src/exec/select_node.cpp +++ b/be/src/exec/select_node.cpp @@ -21,7 +21,6 @@ #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/raw_value.h" -#include "util/debug_util.h" namespace doris { @@ -125,7 +124,7 @@ bool SelectNode::copy_rows(RowBatch* output_batch) { if (VLOG_ROW_IS_ON) { for (int i = 0; i < output_batch->num_rows(); ++i) { TupleRow* row = output_batch->get_row(i); - VLOG_ROW << "SelectNode input row: " << print_row(row, row_desc()); + VLOG_ROW << "SelectNode input row: " << row->to_string(row_desc()); } } diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index 1fbb96a48ebe2c..4f24a8d70617cc 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -29,7 +29,6 @@ #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/tuple_row.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "util/tuple_row_compare.h" #include @@ -164,7 +163,7 @@ Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } if (VLOG_ROW_IS_ON) { - VLOG_ROW << "TOPN-node output row: " << print_batch(row_batch); + VLOG_ROW << "TOPN-node output row: " << row_batch->to_string(); } *eos = _get_next_iter == _sorted_top_n.end(); diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index 0ac1bec8cad4fa..819589a9520a00 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -51,6 +51,7 @@ #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/load_path_mgr.h" +#include "runtime/client_cache.h" #include "gen_cpp/MasterService_types.h" #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/FrontendService.h" diff --git a/be/src/olap/file_helper.cpp b/be/src/olap/file_helper.cpp index 2ca937e8470d73..8d536043d565fc 100644 --- a/be/src/olap/file_helper.cpp +++ b/be/src/olap/file_helper.cpp @@ -27,7 +27,6 @@ #include "olap/olap_common.h" #include "olap/olap_define.h" -#include "olap/olap_engine.h" #include "olap/utils.h" #include "util/debug_util.h" @@ -35,6 +34,8 @@ using std::string; namespace doris { +Cache* FileHandler::_s_fd_cache; + FileHandler::FileHandler() : _fd(-1), _wr_length(0), @@ -85,7 +86,7 @@ OLAPStatus FileHandler::open_with_cache(const string& file_name, int flag) { } CacheKey key(file_name.c_str(), file_name.size()); - Cache* fd_cache = OLAPEngine::get_instance()->file_descriptor_lru_cache(); + Cache* fd_cache = get_fd_cache(); _cache_handle = fd_cache->lookup(key); if (NULL != _cache_handle) { FileDescriptor* file_desc = @@ -146,8 +147,7 @@ OLAPStatus FileHandler::open_with_mode(const string& file_name, int flag, int mo } OLAPStatus FileHandler::release() { - Cache* fd_cache = OLAPEngine::get_instance()->file_descriptor_lru_cache(); - fd_cache->release(_cache_handle); + get_fd_cache()->release(_cache_handle); _cache_handle = NULL; _is_using_cache = false; return OLAP_SUCCESS; diff --git a/be/src/olap/file_helper.h b/be/src/olap/file_helper.h index 0f3a8f841d8c82..2f4279c09a7643 100644 --- a/be/src/olap/file_helper.h +++ b/be/src/olap/file_helper.h @@ -101,7 +101,16 @@ class FileHandler { SAFE_DELETE(file_desc); } + static Cache* get_fd_cache() { + return _s_fd_cache; + } + static void set_fd_cache(Cache* cache) { + _s_fd_cache = cache; + } + private: + static Cache* _s_fd_cache; + int _fd; off_t _wr_length; const int64_t _cache_threshold = 1<<19; diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index 830bc571a10964..0b40a93be11bdd 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -115,7 +115,6 @@ OLAPEngine::OLAPEngine(const EngineOptions& options) _is_all_cluster_id_exist(true), _is_drop_tables(false), _global_table_id(0), - _file_descriptor_lru_cache(NULL), _index_stream_lru_cache(NULL), _tablet_stat_cache_update_time_ms(0), _snapshot_base_id(0) { @@ -303,12 +302,13 @@ OLAPStatus OLAPEngine::open() { _update_storage_medium_type_count(); - _file_descriptor_lru_cache = new_lru_cache(config::file_descriptor_cache_capacity); - if (_file_descriptor_lru_cache == NULL) { + auto cache = new_lru_cache(config::file_descriptor_cache_capacity); + if (cache == nullptr) { OLAP_LOG_WARNING("failed to init file descriptor LRUCache"); _tablet_map.clear(); return OLAP_ERR_INIT_FAILED; } + FileHandler::set_fd_cache(cache); // 初始化LRUCache // cache大小可通过配置文件配置 @@ -605,7 +605,8 @@ OLAPStatus OLAPEngine::_get_root_path_capacity( OLAPStatus OLAPEngine::clear() { // 删除lru中所有内容,其实进程退出这么做本身意义不大,但对单测和更容易发现问题还是有很大意义的 - SAFE_DELETE(_file_descriptor_lru_cache); + delete FileHandler::get_fd_cache(); + FileHandler::set_fd_cache(nullptr); SAFE_DELETE(_index_stream_lru_cache); _tablet_map.clear(); @@ -1651,7 +1652,7 @@ bool OLAPEngine::_can_do_compaction(OLAPTablePtr table) { void OLAPEngine::start_clean_fd_cache() { OLAP_LOG_TRACE("start clean file descritpor cache"); - _file_descriptor_lru_cache->prune(); + FileHandler::get_fd_cache()->prune(); OLAP_LOG_TRACE("end clean file descritpor cache"); } diff --git a/be/src/olap/olap_engine.h b/be/src/olap/olap_engine.h index 1cd4e704b9fcb0..15a8b904d698db 100644 --- a/be/src/olap/olap_engine.h +++ b/be/src/olap/olap_engine.h @@ -197,10 +197,6 @@ class OLAPEngine { return _index_stream_lru_cache; } - Cache* file_descriptor_lru_cache() { - return _file_descriptor_lru_cache; - } - // 清理trash和snapshot文件,返回清理后的磁盘使用量 OLAPStatus start_trash_sweep(double *usage); diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 9eabd667579892..5dcf63bb403f0d 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -36,6 +36,7 @@ add_library(Runtime STATIC datetime_value.cpp descriptors.cpp exec_env.cpp + exec_env_init.cpp lib_cache.cpp mem_pool.cpp plan_fragment_executor.cpp diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc b/be/src/runtime/bufferpool/buffer_allocator.cc index 0e421263f1074a..2e07a4bf4c8b44 100644 --- a/be/src/runtime/bufferpool/buffer_allocator.cc +++ b/be/src/runtime/bufferpool/buffer_allocator.cc @@ -492,7 +492,7 @@ BufferPool::FreeBufferArena::~FreeBufferArena() { void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) { lock_guard al(lock_); - if (config::FLAGS_disable_mem_pools) { + if (config::disable_mem_pools) { int64_t len = buffer.len(); parent_->system_allocator_->Free(move(buffer)); parent_->system_bytes_remaining_.add(len); @@ -635,7 +635,7 @@ std::pair BufferPool::FreeBufferArena::FreeSystemMemory( } void BufferPool::FreeBufferArena::AddCleanPage(Page* page) { - bool eviction_needed = config::FLAGS_disable_mem_pools + bool eviction_needed = config::disable_mem_pools || DecreaseBytesRemaining( page->len, true, &parent_->clean_page_bytes_remaining_) == 0; lock_guard al(lock_); diff --git a/be/src/runtime/bufferpool/system_allocator.cc b/be/src/runtime/bufferpool/system_allocator.cc index 8fe4cf59ba6b1a..0b79dc66b6954d 100644 --- a/be/src/runtime/bufferpool/system_allocator.cc +++ b/be/src/runtime/bufferpool/system_allocator.cc @@ -63,7 +63,7 @@ Status SystemAllocator::Allocate(int64_t len, BufferPool::BufferHandle* buffer) DCHECK(BitUtil::IsPowerOf2(len)) << len; uint8_t* buffer_mem; - if (config::FLAGS_mmap_buffers) { + if (config::mmap_buffers) { RETURN_IF_ERROR(AllocateViaMMap(len, &buffer_mem)); } else { RETURN_IF_ERROR(AllocateViaMalloc(len, &buffer_mem)); @@ -74,7 +74,7 @@ Status SystemAllocator::Allocate(int64_t len, BufferPool::BufferHandle* buffer) Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { int64_t map_len = len; - bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && config::FLAGS_madvise_huge_pages; + bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages; if (use_huge_pages) { // Map an extra huge page so we can fix up the alignment if needed. map_len += HUGE_PAGE_SIZE; @@ -116,7 +116,7 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { } Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) { - bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && config::FLAGS_madvise_huge_pages; + bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages; // Allocate, aligned to the page size that we expect to back the memory range. // This ensures that it can be backed by a whole pages, rather than parts of pages. size_t alignment = use_huge_pages ? HUGE_PAGE_SIZE : SMALL_PAGE_SIZE; @@ -144,11 +144,11 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) { } void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) { - if (config::FLAGS_mmap_buffers) { + if (config::mmap_buffers) { int rc = munmap(buffer.data(), buffer.len()); DCHECK_EQ(rc, 0) << "Unexpected munmap() error: " << errno; } else { - bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && config::FLAGS_madvise_huge_pages; + bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages; if (use_huge_pages) { // Undo the madvise so that is isn't a candidate to be newly backed by huge pages. // We depend on TCMalloc's "aggressive decommit" mode decommitting the physical diff --git a/be/src/runtime/data_spliter.cpp b/be/src/runtime/data_spliter.cpp index 08158fba704ca1..eb128367053cb0 100644 --- a/be/src/runtime/data_spliter.cpp +++ b/be/src/runtime/data_spliter.cpp @@ -32,7 +32,6 @@ #include "runtime/load_path_mgr.h" #include "runtime/mem_tracker.h" #include "util/runtime_profile.h" -#include "util/debug_util.h" #include "util/file_utils.h" #include "gen_cpp/DataSinks_types.h" @@ -240,7 +239,7 @@ Status DataSpliter::process_one_row(RuntimeState* state, TupleRow* row) { state->set_normal_row_number(state->get_normal_row_number() - 1); state->append_error_msg_to_file( - print_row(row, _row_desc), + row->to_string(_row_desc), status.get_error_msg()); return Status::OK; } diff --git a/be/src/runtime/dpp_sink.cpp b/be/src/runtime/dpp_sink.cpp index 47b7c14929c49e..0604d128888d24 100644 --- a/be/src/runtime/dpp_sink.cpp +++ b/be/src/runtime/dpp_sink.cpp @@ -35,6 +35,7 @@ #include "gen_cpp/Types_types.h" #include "util/count_down_latch.hpp" #include "util/debug_util.h" +#include "util/thread_pool.hpp" #include "olap/field.h" namespace doris { diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp index c28dd07231f559..4c3321a4158695 100644 --- a/be/src/runtime/etl_job_mgr.cpp +++ b/be/src/runtime/etl_job_mgr.cpp @@ -29,6 +29,7 @@ #include "runtime/fragment_mgr.h" #include "runtime/data_spliter.h" #include "runtime/runtime_state.h" +#include "runtime/client_cache.h" #include "util/file_utils.h" #include "gen_cpp/MasterService_types.h" #include "gen_cpp/HeartbeatService_types.h" diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 652887bc3429bb..8eea2c9afc6210 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -17,291 +17,18 @@ #include "runtime/exec_env.h" -#include - -#include - -#include "common/logging.h" -#include "runtime/broker_mgr.h" -#include "runtime/bufferpool/buffer_pool.h" -#include "runtime/client_cache.h" -#include "runtime/data_stream_mgr.h" -#include "runtime/disk_io_mgr.h" -#include "runtime/result_buffer_mgr.h" -#include "runtime/mem_tracker.h" -#include "runtime/thread_resource_mgr.h" -#include "runtime/fragment_mgr.h" -#include "runtime/tablet_writer_mgr.h" -#include "runtime/tmp_file_mgr.h" -#include "runtime/bufferpool/reservation_tracker.h" -#include "util/metrics.h" -#include "util/network_util.h" -#include "http/web_page_handler.h" -#include "http/default_path_handlers.h" -#include "util/parse_util.h" -#include "util/mem_info.h" -#include "util/debug_util.h" -#include "http/ev_http_server.h" -#include "http/action/mini_load.h" -#include "http/action/checksum_action.h" -#include "http/action/health_action.h" -#include "http/action/reload_tablet_action.h" -#include "http/action/restore_tablet_action.h" -#include "http/action/snapshot_action.h" -#include "http/action/pprof_actions.h" -#include "http/action/metrics_action.h" -#include "http/action/meta_action.h" -#include "http/action/stream_load.h" -#include "http/download_action.h" -#include "http/monitor_action.h" -#include "http/http_method.h" -#include "olap/olap_engine.h" -#include "util/network_util.h" -#include "util/bfd_parser.h" -#include "runtime/etl_job_mgr.h" -#include "runtime/load_path_mgr.h" -#include "runtime/load_stream_mgr.h" -#include "runtime/pull_load_task_mgr.h" -#include "runtime/snapshot_loader.h" -#include "util/pretty_printer.h" -#include "util/doris_metrics.h" -#include "util/brpc_stub_cache.h" -#include "gen_cpp/BackendService.h" -#include "gen_cpp/FrontendService.h" -#include "gen_cpp/TPaloBrokerService.h" #include "gen_cpp/HeartbeatService_types.h" namespace doris { -ExecEnv* ExecEnv::_exec_env = nullptr; - -ExecEnv::ExecEnv() - : _thread_mgr(new ThreadResourceMgr), - _master_info(new TMasterInfo()), - _load_stream_mgr(new LoadStreamMgr()), - _brpc_stub_cache(new BrpcStubCache()) { +ExecEnv::ExecEnv() { } -ExecEnv::ExecEnv(const std::vector& paths) : - _store_paths(paths), - _stream_mgr(new DataStreamMgr()), - _result_mgr(new ResultBufferMgr()), - _client_cache(new BackendServiceClientCache()), - _frontend_client_cache(new FrontendServiceClientCache()), - _broker_client_cache(new BrokerServiceClientCache()), - _ev_http_server(new EvHttpServer(config::webserver_port, config::webserver_num_workers)), - _web_page_handler(new WebPageHandler(_ev_http_server.get())), - _mem_tracker(NULL), - _pool_mem_trackers(new PoolMemTrackerRegistry), - _thread_mgr(new ThreadResourceMgr), - _thread_pool(new PriorityThreadPool( - config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size)), - _etl_thread_pool(new ThreadPool( - config::etl_thread_pool_size, - config::etl_thread_pool_queue_size)), - _cgroups_mgr(new CgroupsMgr(this, config::doris_cgroups)), - _fragment_mgr(new FragmentMgr(this)), - _master_info(new TMasterInfo()), - _etl_job_mgr(new EtlJobMgr(this)), - _load_path_mgr(new LoadPathMgr(this)), - _disk_io_mgr(new DiskIoMgr()), - _tmp_file_mgr(new TmpFileMgr(this)), - _bfd_parser(BfdParser::create()), - _pull_load_task_mgr(new PullLoadTaskMgr(config::pull_load_task_dir)), - _broker_mgr(new BrokerMgr(this)), - _tablet_writer_mgr(new TabletWriterMgr(this)), - _load_stream_mgr(new LoadStreamMgr()), - _snapshot_loader(new SnapshotLoader(this)), - _brpc_stub_cache(new BrpcStubCache()), - _enable_webserver(true), - _tz_database(TimezoneDatabase()) { - _client_cache->init_metrics(DorisMetrics::metrics(), "backend"); - _frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend"); - _broker_client_cache->init_metrics(DorisMetrics::metrics(), "broker"); - _result_mgr->init(); - _cgroups_mgr->init_cgroups(); - _etl_job_mgr->init(); - Status status = _load_path_mgr->init(); - if (!status.ok()) { - LOG(ERROR) << "load path mgr init failed." << status.get_error_msg(); - exit(-1); - } - status = _pull_load_task_mgr->init(); - if (!status.ok()) { - LOG(ERROR) << "pull load task manager init failed." << status.get_error_msg(); - exit(-1); - } - _broker_mgr->init(); - _exec_env = this; -} - -ExecEnv::~ExecEnv() {} - -Status ExecEnv::init_for_tests() { - _mem_tracker.reset(new MemTracker(-1)); - return Status::OK; -} - -Status ExecEnv::start_services() { - LOG(INFO) << "Starting global services"; - - // Initialize global memory limit. - int64_t bytes_limit = 0; - bool is_percent = false; - // --mem_limit="" means no memory limit - bytes_limit = ParseUtil::parse_mem_spec(config::mem_limit, &is_percent); - - if (bytes_limit < 0) { - return Status("Failed to parse mem limit from '" + config::mem_limit + "'."); - } - - std::stringstream ss; - if (!BitUtil::IsPowerOf2(config::FLAGS_min_buffer_size)) { - ss << "--min_buffer_size must be a power-of-two: " << config::FLAGS_min_buffer_size; - return Status(ss.str()); - } - - int64_t buffer_pool_limit = ParseUtil::parse_mem_spec(config::FLAGS_buffer_pool_limit, - &is_percent); - if (buffer_pool_limit <= 0) { - ss << "Invalid --buffer_pool_limit value, must be a percentage or " - "positive bytes value or percentage: " << config::FLAGS_buffer_pool_limit; - return Status(ss.str()); - } - buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, config::FLAGS_min_buffer_size); - - int64_t clean_pages_limit = ParseUtil::parse_mem_spec(config::FLAGS_buffer_pool_clean_pages_limit, - &is_percent); - if (clean_pages_limit <= 0) { - ss << "Invalid --buffer_pool_clean_pages_limit value, must be a percentage or " - "positive bytes value or percentage: " << config::FLAGS_buffer_pool_clean_pages_limit; - return Status(ss.str()); - } - - init_buffer_pool(config::FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit); - // Limit of 0 means no memory limit. - if (bytes_limit > 0) { - _mem_tracker.reset(new MemTracker(bytes_limit)); - } - - if (bytes_limit > MemInfo::physical_mem()) { - LOG(WARNING) << "Memory limit " - << PrettyPrinter::print(bytes_limit, TUnit::BYTES) - << " exceeds physical memory of " - << PrettyPrinter::print(MemInfo::physical_mem(), - TUnit::BYTES); - } - - LOG(INFO) << "Using global memory limit: " - << PrettyPrinter::print(bytes_limit, TUnit::BYTES); - - RETURN_IF_ERROR(_disk_io_mgr->init(_mem_tracker.get())); - - // Start services in order to ensure that dependencies between them are met - if (_enable_webserver) { - RETURN_IF_ERROR(start_webserver()); - } else { - LOG(INFO) << "Webserver is disabled"; - } - - RETURN_IF_ERROR(_tmp_file_mgr->init(DorisMetrics::metrics())); - - return Status::OK; -} - -Status ExecEnv::start_webserver() { - add_default_path_handlers(_web_page_handler.get(), _mem_tracker.get()); - _ev_http_server->register_handler(HttpMethod::PUT, - "/api/{db}/{table}/_load", - new MiniLoadAction(this)); - _ev_http_server->register_handler(HttpMethod::PUT, - "/api/{db}/{table}/_stream_load", - new StreamLoadAction(this)); - - std::vector allow_paths; - for (auto& path : _store_paths) { - allow_paths.emplace_back(path.path); - } - DownloadAction* download_action = new DownloadAction(this, allow_paths); - // = new DownloadAction(this, config::mini_load_download_path); - _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action); - _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); - - DownloadAction* tablet_download_action = new DownloadAction(this, allow_paths); - _ev_http_server->register_handler(HttpMethod::HEAD, - "/api/_tablet/_download", - tablet_download_action); - _ev_http_server->register_handler(HttpMethod::GET, - "/api/_tablet/_download", - tablet_download_action); - - DownloadAction* error_log_download_action = new DownloadAction( - this, _load_path_mgr->get_load_error_file_dir()); - _ev_http_server->register_handler( - HttpMethod::GET, "/api/_load_error_log", error_log_download_action); - _ev_http_server->register_handler( - HttpMethod::HEAD, "/api/_load_error_log", error_log_download_action); - - // Register monitor - MonitorAction* monitor_action = new MonitorAction(); - monitor_action->register_module("etl_mgr", etl_job_mgr()); - monitor_action->register_module("fragment_mgr", fragment_mgr()); - _ev_http_server->register_handler(HttpMethod::GET, "/_monitor/{module}", monitor_action); - - // Register BE health action - HealthAction* health_action = new HealthAction(this); - _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action); - - // register pprof actions - PprofActions::setup(this, _ev_http_server.get()); - - { - auto action = _object_pool.add(new MetricsAction(DorisMetrics::metrics())); - _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action); - } - - MetaAction* meta_action = new MetaAction(HEADER); - _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/header/{tablet_id}/{schema_hash}", meta_action); - -#ifndef BE_TEST - // Register BE checksum action - ChecksumAction* checksum_action = new ChecksumAction(this); - _ev_http_server->register_handler(HttpMethod::GET, "/api/checksum", checksum_action); - - // Register BE reload tablet action - ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(this); - _ev_http_server->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action); - - RestoreTabletAction* restore_tablet_action = new RestoreTabletAction(this); - _ev_http_server->register_handler(HttpMethod::POST, "/api/restore_tablet", restore_tablet_action); - - // Register BE snapshot action - SnapshotAction* snapshot_action = new SnapshotAction(this); - _ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); -#endif - - RETURN_IF_ERROR(_ev_http_server->start()); - return Status::OK; -} - -uint32_t ExecEnv::cluster_id() { - return OLAPEngine::get_instance()->effective_cluster_id(); -} - -void ExecEnv::init_buffer_pool(int64_t min_page_size, int64_t capacity, int64_t clean_pages_limit) { - DCHECK(_buffer_pool == nullptr); - _buffer_pool.reset(new BufferPool(min_page_size, capacity, clean_pages_limit)); - _buffer_reservation.reset(new ReservationTracker); - _buffer_reservation->InitRootTracker(nullptr, capacity); +ExecEnv::~ExecEnv() { } const std::string& ExecEnv::token() const { return _master_info->token; } -MetricRegistry* ExecEnv::metrics() const { - return DorisMetrics::metrics(); -} - } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index dd3884b2d97382..9d295512898569 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -18,49 +18,44 @@ #ifndef DORIS_BE_RUNTIME_EXEC_ENV_H #define DORIS_BE_RUNTIME_EXEC_ENV_H -#include -#include -#include - -#include "agent/cgroups_mgr.h" #include "common/status.h" -#include "common/object_pool.h" -#include "exprs/timestamp_functions.h" -#include "runtime/client_cache.h" -#include "runtime/lib_cache.h" -#include "util/thread_pool.hpp" -#include "util/priority_thread_pool.hpp" -#include "util/thread_pool.hpp" #include "olap/options.h" namespace doris { +class BfdParser; +class BrokerMgr; +class BrpcStubCache; +class BufferPool; +class CgroupsMgr; class DataStreamMgr; -class ResultBufferMgr; -class TestExecEnv; +class DiskIoMgr; +class EtlJobMgr; class EvHttpServer; -class WebPageHandler; -class MemTracker; -class PoolMemTrackerRegistry; -class ThreadResourceMgr; class FragmentMgr; -class TMasterInfo; -class EtlJobMgr; class LoadPathMgr; -class DiskIoMgr; -class TmpFileMgr; -class BfdParser; -class PullLoadTaskMgr; -class BrokerMgr; +class LoadStreamMgr; +class MemTracker; class MetricRegistry; -class BufferPool; +class OLAPEngine; +class PoolMemTrackerRegistry; +class PriorityThreadPool; +class PullLoadTaskMgr; class ReservationTracker; -class TabletWriterMgr; -class LoadStreamMgr; -class ConnectionManager; +class ResultBufferMgr; class SnapshotLoader; -class BrpcStubCache; -class OLAPEngine; +class TMasterInfo; +class TabletWriterMgr; +class TestExecEnv; +class ThreadPool; +class ThreadResourceMgr; +class TmpFileMgr; +class WebPageHandler; + +class BackendServiceClient; +class FrontendServiceClient; +class TPaloBrokerServiceClient; +template class ClientCache; // Execution environment for queries/plan fragments. // Contains all required global structures, and handles to @@ -68,184 +63,101 @@ class OLAPEngine; // once to properly initialise service state. class ExecEnv { public: - ExecEnv(const std::vector& store_paths); + // Initial exec enviorment. must call this to init all + static Status init(ExecEnv* env, const std::vector& store_paths); + static void destroy(ExecEnv* exec_env); - // only used for test - ExecEnv(); - - /// Returns the first created exec env instance. In a normal impalad, this is + /// Returns the first created exec env instance. In a normal doris, this is /// the only instance. In test setups with multiple ExecEnv's per process, /// we return the most recently created instance. - static ExecEnv* GetInstance() { return _exec_env; } + static ExecEnv* GetInstance() { + static ExecEnv s_exec_env; + return &s_exec_env; + } + + // only used for test + ExecEnv(); // Empty destructor because the compiler-generated one requires full // declarations for classes in scoped_ptrs. - virtual ~ExecEnv(); - - uint32_t cluster_id(); + ~ExecEnv(); const std::string& token() const; - - MetricRegistry* metrics() const; - - DataStreamMgr* stream_mgr() { - return _stream_mgr.get(); - } - ResultBufferMgr* result_mgr() { - return _result_mgr.get(); - } - BackendServiceClientCache* client_cache() { - return _client_cache.get(); - } - FrontendServiceClientCache* frontend_client_cache() { - return _frontend_client_cache.get(); - } - BrokerServiceClientCache* broker_client_cache() { - return _broker_client_cache.get(); - } - WebPageHandler* web_page_handler() { - return _web_page_handler.get(); - } - MemTracker* process_mem_tracker() { - return _mem_tracker.get(); - } - PoolMemTrackerRegistry* pool_mem_trackers() { - return _pool_mem_trackers.get(); - } - ThreadResourceMgr* thread_mgr() { - return _thread_mgr.get(); - } - PriorityThreadPool* thread_pool() { - return _thread_pool.get(); - } - ThreadPool* etl_thread_pool() { - return _etl_thread_pool.get(); - } - CgroupsMgr* cgroups_mgr() { - return _cgroups_mgr.get(); - } - FragmentMgr* fragment_mgr() { - return _fragment_mgr.get(); - } - TMasterInfo* master_info() { - return _master_info.get(); - } - EtlJobMgr* etl_job_mgr() { - return _etl_job_mgr.get(); - } - LoadPathMgr* load_path_mgr() { - return _load_path_mgr.get(); - } - DiskIoMgr* disk_io_mgr() { - return _disk_io_mgr.get(); - } - TmpFileMgr* tmp_file_mgr() { - return _tmp_file_mgr.get(); - } - - BfdParser* bfd_parser() const { - return _bfd_parser.get(); - } - - PullLoadTaskMgr* pull_load_task_mgr() const { - return _pull_load_task_mgr.get(); - } - - BrokerMgr* broker_mgr() const { - return _broker_mgr.get(); - } - - SnapshotLoader* snapshot_loader() const { - return _snapshot_loader.get(); - } - - BrpcStubCache* brpc_stub_cache() const { - return _brpc_stub_cache.get(); - } - - void set_enable_webserver(bool enable) { - _enable_webserver = enable; - } - - // Starts any dependent services in their correct order - virtual Status start_services(); - - // Initializes the exec env for running FE tests. - Status init_for_tests(); - - ReservationTracker* buffer_reservation() { - return _buffer_reservation.get(); - } - - BufferPool* buffer_pool() { - return _buffer_pool.get(); - } - - TabletWriterMgr* tablet_writer_mgr() { - return _tablet_writer_mgr.get(); - } - - LoadStreamMgr* load_stream_mgr() { - return _load_stream_mgr.get(); - } - - const std::vector& store_paths() const { - return _store_paths; - } - - void set_store_paths(const std::vector& paths) { - _store_paths = paths; - } - + MetricRegistry* metrics() const { return _metrics; } + DataStreamMgr* stream_mgr() { return _stream_mgr; } + ResultBufferMgr* result_mgr() { return _result_mgr; } + ClientCache* client_cache() { return _client_cache; } + ClientCache* frontend_client_cache() { return _frontend_client_cache; } + ClientCache* broker_client_cache() { return _broker_client_cache; } + MemTracker* process_mem_tracker() { return _mem_tracker; } + PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; } + ThreadResourceMgr* thread_mgr() { return _thread_mgr; } + PriorityThreadPool* thread_pool() { return _thread_pool; } + ThreadPool* etl_thread_pool() { return _etl_thread_pool; } + CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; } + FragmentMgr* fragment_mgr() { return _fragment_mgr; } + TMasterInfo* master_info() { return _master_info; } + EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; } + LoadPathMgr* load_path_mgr() { return _load_path_mgr; } + DiskIoMgr* disk_io_mgr() { return _disk_io_mgr; } + TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr; } + BfdParser* bfd_parser() const { return _bfd_parser; } + PullLoadTaskMgr* pull_load_task_mgr() const { return _pull_load_task_mgr; } + BrokerMgr* broker_mgr() const { return _broker_mgr; } + SnapshotLoader* snapshot_loader() const { return _snapshot_loader; } + BrpcStubCache* brpc_stub_cache() const { return _brpc_stub_cache; } + ReservationTracker* buffer_reservation() { return _buffer_reservation; } + BufferPool* buffer_pool() { return _buffer_pool; } + TabletWriterMgr* tablet_writer_mgr() { return _tablet_writer_mgr; } + LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; } + const std::vector& store_paths() const { return _store_paths; } + void set_store_paths(const std::vector& paths) { _store_paths = paths; } OLAPEngine* olap_engine() { return _olap_engine; } - void set_olap_engine(OLAPEngine* olap_engine) { _olap_engine = olap_engine; } private: - Status start_webserver(); - std::vector _store_paths; - // Leave protected so that subclasses can override - boost::scoped_ptr _stream_mgr; - boost::scoped_ptr _result_mgr; - boost::scoped_ptr _client_cache; - boost::scoped_ptr _frontend_client_cache; - std::unique_ptr_broker_client_cache; - boost::scoped_ptr _ev_http_server; - boost::scoped_ptr _web_page_handler; - boost::scoped_ptr _mem_tracker; - boost::scoped_ptr _pool_mem_trackers; - boost::scoped_ptr _thread_mgr; - boost::scoped_ptr _thread_pool; - boost::scoped_ptr _etl_thread_pool; - boost::scoped_ptr _cgroups_mgr; - boost::scoped_ptr _fragment_mgr; - boost::scoped_ptr _master_info; - boost::scoped_ptr _etl_job_mgr; - boost::scoped_ptr _load_path_mgr; - boost::scoped_ptr _disk_io_mgr; - boost::scoped_ptr _tmp_file_mgr; - - std::unique_ptr _bfd_parser; - std::unique_ptr _pull_load_task_mgr; - std::unique_ptr _broker_mgr; - std::unique_ptr _tablet_writer_mgr; - std::unique_ptr _load_stream_mgr; - std::unique_ptr _snapshot_loader; - std::unique_ptr _brpc_stub_cache; - bool _enable_webserver; - - boost::scoped_ptr _buffer_reservation; - boost::scoped_ptr _buffer_pool; + Status _init(const std::vector& store_paths); + void _destory(); - OLAPEngine* _olap_engine = nullptr; + Status _init_mem_tracker(); + /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity. + void _init_buffer_pool(int64_t min_page_len, + int64_t capacity, int64_t clean_pages_limit); - ObjectPool _object_pool; private: - static ExecEnv* _exec_env; - TimezoneDatabase _tz_database; + std::vector _store_paths; + // Leave protected so that subclasses can override + MetricRegistry* _metrics = nullptr; + DataStreamMgr* _stream_mgr = nullptr; + ResultBufferMgr* _result_mgr = nullptr; + ClientCache* _client_cache = nullptr; + ClientCache* _frontend_client_cache = nullptr; + ClientCache* _broker_client_cache = nullptr; + MemTracker* _mem_tracker = nullptr; + PoolMemTrackerRegistry* _pool_mem_trackers = nullptr; + ThreadResourceMgr* _thread_mgr = nullptr; + PriorityThreadPool* _thread_pool = nullptr; + ThreadPool* _etl_thread_pool = nullptr; + CgroupsMgr* _cgroups_mgr = nullptr; + FragmentMgr* _fragment_mgr = nullptr; + TMasterInfo* _master_info = nullptr; + EtlJobMgr* _etl_job_mgr = nullptr; + LoadPathMgr* _load_path_mgr = nullptr; + DiskIoMgr* _disk_io_mgr = nullptr; + TmpFileMgr* _tmp_file_mgr = nullptr; + + BfdParser* _bfd_parser = nullptr; + PullLoadTaskMgr* _pull_load_task_mgr = nullptr; + BrokerMgr* _broker_mgr = nullptr; + TabletWriterMgr* _tablet_writer_mgr = nullptr; + LoadStreamMgr* _load_stream_mgr = nullptr; + SnapshotLoader* _snapshot_loader = nullptr; + BrpcStubCache* _brpc_stub_cache = nullptr; + + ReservationTracker* _buffer_reservation = nullptr; + BufferPool* _buffer_pool = nullptr; - /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity. - void init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit); + OLAPEngine* _olap_engine = nullptr; }; } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp new file mode 100644 index 00000000000000..1584567e259b5e --- /dev/null +++ b/be/src/runtime/exec_env_init.cpp @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/exec_env.h" + +#include + +#include "common/config.h" +#include "common/logging.h" +#include "runtime/broker_mgr.h" +#include "runtime/bufferpool/buffer_pool.h" +#include "runtime/client_cache.h" +#include "runtime/data_stream_mgr.h" +#include "runtime/disk_io_mgr.h" +#include "runtime/result_buffer_mgr.h" +#include "runtime/mem_tracker.h" +#include "runtime/thread_resource_mgr.h" +#include "runtime/fragment_mgr.h" +#include "runtime/tablet_writer_mgr.h" +#include "runtime/tmp_file_mgr.h" +#include "runtime/bufferpool/reservation_tracker.h" +#include "util/metrics.h" +#include "util/network_util.h" +#include "util/parse_util.h" +#include "util/mem_info.h" +#include "util/debug_util.h" +#include "olap/olap_engine.h" +#include "util/network_util.h" +#include "util/bfd_parser.h" +#include "runtime/etl_job_mgr.h" +#include "runtime/load_path_mgr.h" +#include "runtime/load_stream_mgr.h" +#include "runtime/pull_load_task_mgr.h" +#include "runtime/snapshot_loader.h" +#include "util/pretty_printer.h" +#include "util/doris_metrics.h" +#include "util/brpc_stub_cache.h" +#include "util/priority_thread_pool.hpp" +#include "agent/cgroups_mgr.h" +#include "util/thread_pool.hpp" +#include "gen_cpp/BackendService.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "gen_cpp/HeartbeatService_types.h" + +namespace doris { + +Status ExecEnv::init(ExecEnv* env, const std::vector& store_paths) { + return env->_init(store_paths); +} + +Status ExecEnv::_init(const std::vector& store_paths) { + _store_paths = store_paths; + + _metrics = DorisMetrics::metrics(); + _stream_mgr = new DataStreamMgr(); + _result_mgr = new ResultBufferMgr(); + _client_cache = new BackendServiceClientCache(); + _frontend_client_cache = new FrontendServiceClientCache(); + _broker_client_cache = new BrokerServiceClientCache(); + _mem_tracker = nullptr; + _pool_mem_trackers = new PoolMemTrackerRegistry(); + _thread_mgr = new ThreadResourceMgr(); + _thread_pool = new PriorityThreadPool( + config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size); + _etl_thread_pool = new ThreadPool( + config::etl_thread_pool_size, + config::etl_thread_pool_queue_size); + _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups); + _fragment_mgr = new FragmentMgr(this); + _master_info = new TMasterInfo(); + _etl_job_mgr = new EtlJobMgr(this); + _load_path_mgr = new LoadPathMgr(this); + _disk_io_mgr = new DiskIoMgr(); + _tmp_file_mgr = new TmpFileMgr(this), + _bfd_parser = BfdParser::create(); + _pull_load_task_mgr = new PullLoadTaskMgr(config::pull_load_task_dir); + _broker_mgr = new BrokerMgr(this); + _tablet_writer_mgr = new TabletWriterMgr(this); + _load_stream_mgr = new LoadStreamMgr(); + _snapshot_loader = new SnapshotLoader(this); + _brpc_stub_cache = new BrpcStubCache(); + + _client_cache->init_metrics(DorisMetrics::metrics(), "backend"); + _frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend"); + _broker_client_cache->init_metrics(DorisMetrics::metrics(), "broker"); + _result_mgr->init(); + _cgroups_mgr->init_cgroups(); + _etl_job_mgr->init(); + Status status = _load_path_mgr->init(); + if (!status.ok()) { + LOG(ERROR) << "load path mgr init failed." << status.get_error_msg(); + exit(-1); + } + status = _pull_load_task_mgr->init(); + if (!status.ok()) { + LOG(ERROR) << "pull load task manager init failed." << status.get_error_msg(); + exit(-1); + } + _broker_mgr->init(); + return _init_mem_tracker(); +} + +Status ExecEnv::_init_mem_tracker() { + // Initialize global memory limit. + int64_t bytes_limit = 0; + bool is_percent = false; + std::stringstream ss; + // --mem_limit="" means no memory limit + bytes_limit = ParseUtil::parse_mem_spec(config::mem_limit, &is_percent); + if (bytes_limit < 0) { + ss << "Failed to parse mem limit from '" + config::mem_limit + "'."; + return Status(ss.str()); + } + + if (!BitUtil::IsPowerOf2(config::min_buffer_size)) { + ss << "--min_buffer_size must be a power-of-two: " << config::min_buffer_size; + return Status(ss.str()); + } + + int64_t buffer_pool_limit = ParseUtil::parse_mem_spec( + config::buffer_pool_limit, &is_percent); + if (buffer_pool_limit <= 0) { + ss << "Invalid --buffer_pool_limit value, must be a percentage or " + "positive bytes value or percentage: " << config::buffer_pool_limit; + return Status(ss.str()); + } + buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, config::min_buffer_size); + + int64_t clean_pages_limit = ParseUtil::parse_mem_spec( + config::buffer_pool_clean_pages_limit, &is_percent); + if (clean_pages_limit <= 0) { + ss << "Invalid --buffer_pool_clean_pages_limit value, must be a percentage or " + "positive bytes value or percentage: " << config::buffer_pool_clean_pages_limit; + return Status(ss.str()); + } + + _init_buffer_pool(config::min_buffer_size, buffer_pool_limit, clean_pages_limit); + + // Limit of 0 means no memory limit. + if (bytes_limit > 0) { + _mem_tracker = new MemTracker(bytes_limit); + } + + if (bytes_limit > MemInfo::physical_mem()) { + LOG(WARNING) << "Memory limit " + << PrettyPrinter::print(bytes_limit, TUnit::BYTES) + << " exceeds physical memory of " + << PrettyPrinter::print(MemInfo::physical_mem(), + TUnit::BYTES); + } + + LOG(INFO) << "Using global memory limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); + RETURN_IF_ERROR(_disk_io_mgr->init(_mem_tracker)); + RETURN_IF_ERROR(_tmp_file_mgr->init(DorisMetrics::metrics())); + return Status::OK; +} + +void ExecEnv::_init_buffer_pool(int64_t min_page_size, + int64_t capacity, + int64_t clean_pages_limit) { + DCHECK(_buffer_pool == nullptr); + _buffer_pool = new BufferPool(min_page_size, capacity, clean_pages_limit); + _buffer_reservation = new ReservationTracker(); + _buffer_reservation->InitRootTracker(nullptr, capacity); +} + +void ExecEnv::_destory() { + delete _brpc_stub_cache; + delete _snapshot_loader; + delete _load_stream_mgr; + delete _tablet_writer_mgr; + delete _broker_mgr; + delete _pull_load_task_mgr; + delete _bfd_parser; + delete _tmp_file_mgr; + delete _disk_io_mgr; + delete _load_path_mgr; + delete _etl_job_mgr; + delete _master_info; + delete _fragment_mgr; + delete _cgroups_mgr; + delete _etl_thread_pool; + delete _thread_pool; + delete _thread_mgr; + delete _pool_mem_trackers; + delete _mem_tracker; + delete _broker_client_cache; + delete _frontend_client_cache; + delete _client_cache; + delete _result_mgr; + delete _stream_mgr; + _metrics = nullptr; +} + +void ExecEnv::destroy(ExecEnv* env) { + env->_destory(); +} + +} diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp index 146ea1f73f4627..c93fb02637a72f 100644 --- a/be/src/runtime/export_sink.cpp +++ b/be/src/runtime/export_sink.cpp @@ -25,7 +25,6 @@ #include "runtime/tuple_row.h" #include "runtime/row_batch.h" #include "util/runtime_profile.h" -#include "util/debug_util.h" #include "util/types.h" #include "exec/local_file_writer.h" #include "exec/broker_writer.h" @@ -89,7 +88,7 @@ Status ExportSink::open(RuntimeState* state) { } Status ExportSink::send(RuntimeState* state, RowBatch* batch) { - VLOG_ROW << "debug: export_sink send batch: " << print_batch(batch); + VLOG_ROW << "debug: export_sink send batch: " << batch->to_string(); SCOPED_TIMER(_profile->total_time_counter()); int num_rows = batch->num_rows(); // we send at most 1024 rows at a time diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9358c18b7a32e4..42efed7f7bac82 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -33,6 +33,7 @@ #include "util/debug_util.h" #include "util/doris_metrics.h" #include "util/thrift_util.h" +#include "runtime/client_cache.h" #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/DataSinks_types.h" diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index fb753a96db6509..8b9a6e72deb5c8 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -126,7 +126,7 @@ bool MemPool::FindChunk(size_t min_size, bool check_limits) { size_t chunk_size = 0; DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE); - if (config::FLAGS_disable_mem_pools) { + if (config::disable_mem_pools) { // Disable pooling by sizing the chunk to fit only this allocation. // Make sure the alignment guarantees are respected. chunk_size = std::max(min_size, alignof(max_align_t)); @@ -252,7 +252,7 @@ bool MemPool::CheckIntegrity(bool check_current_chunk_empty) { DCHECK_LT(current_chunk_idx_, static_cast(chunks_.size())); // Without pooling, there are way too many chunks and this takes too long. - if (config::FLAGS_disable_mem_pools) return true; + if (config::disable_mem_pools) return true; // check that current_chunk_idx_ points to the last chunk with allocated data int64_t total_allocated = 0; diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 8d6db5b5167f4d..23c328391cb7c1 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -113,8 +113,7 @@ class MemPool { /// should be a power-of-two in [1, alignof(std::max_align_t)]. uint8_t* try_allocate_aligned(int64_t size, int alignment) { DCHECK_GE(alignment, 1); - DCHECK_LE(alignment, config::FLAGS_MEMORY_MAX_ALIGNMENT); - //DCHECK_LE(alignment, config::FLAGS_MEMORY_MAX_ALIGNMENT); + DCHECK_LE(alignment, config::memory_max_alignment); DCHECK_EQ(BitUtil::RoundUpToPowerOfTwo(alignment), alignment); return allocate(size, alignment); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 2fb336005238a1..401b825460e177 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -312,7 +312,7 @@ Status PlanFragmentExecutor::open_internal() { for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* row = batch->get_row(i); - VLOG_ROW << print_row(row, row_desc()); + VLOG_ROW << row->to_string(row_desc()); } } diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 7e431e4f9f28a8..d65e7a575e56a7 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -665,4 +665,13 @@ void RowBatch::add_buffer(BufferPool::ClientHandle* client, _buffers.push_back(std::move(buffer_info)); if (flush == FlushMode::FLUSH_RESOURCES) mark_flush_resources(); } + +std::string RowBatch::to_string() { + std::stringstream out; + for (int i = 0; i < _num_rows; ++i) { + out << get_row(i)->to_string(_row_desc) << "\n"; + } + return out.str(); +} + } // end namespace doris diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 2ab9a8c2edb0cd..7a71b6b7fc413d 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -427,6 +427,7 @@ class RowBatch : public RowBatchInterface { int max_tuple_buffer_size(); static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024; + std::string to_string(); private: MemTracker* _mem_tracker; // not owned diff --git a/be/src/runtime/test_env.cc b/be/src/runtime/test_env.cc index 4c69edc2b9ae71..42edc97c8c830e 100644 --- a/be/src/runtime/test_env.cc +++ b/be/src/runtime/test_env.cc @@ -31,7 +31,7 @@ TestEnv::TestEnv() { // DorisMetrics::create_metrics(_s_static_metrics.get()); } _exec_env.reset(new ExecEnv()); - _exec_env->init_for_tests(); + // _exec_env->init_for_tests(); _io_mgr_tracker.reset(new MemTracker(-1)); _block_mgr_parent_tracker.reset(new MemTracker(-1)); _exec_env->disk_io_mgr()->init(_io_mgr_tracker.get()); diff --git a/be/src/runtime/tuple.cpp b/be/src/runtime/tuple.cpp index b9c46893e016d2..0a3fa574567ecd 100644 --- a/be/src/runtime/tuple.cpp +++ b/be/src/runtime/tuple.cpp @@ -26,7 +26,6 @@ #include "runtime/raw_value.h" #include "runtime/tuple_row.h" #include "runtime/string_value.h" -#include "util/debug_util.h" namespace doris { @@ -199,4 +198,44 @@ template void Tuple::materialize_exprs(TupleRow* row, const TupleDescript template void Tuple::materialize_exprs(TupleRow* row, const TupleDescriptor& desc, const std::vector& materialize_expr_ctxs, MemPool* pool, std::vector* non_null_var_values, int* total_var_len); + +std::string Tuple::to_string(const TupleDescriptor& d) const { + std::stringstream out; + out << "("; + + bool first_value = true; + for (auto slot : d.slots()) { + if (!slot->is_materialized()) { + continue; + } + if (first_value) { + first_value = false; + } else { + out << " "; + } + + if (is_null(slot->null_indicator_offset())) { + out << "null"; + } else { + std::string value_str; + RawValue::print_value( + get_slot(slot->tuple_offset()), + slot->type(), + -1, + &value_str); + out << value_str; + } + } + + out << ")"; + return out.str(); +} + +std::string Tuple::to_string(const Tuple* t, const TupleDescriptor& d) { + if (t == nullptr) { + return "null"; + } + return t->to_string(d); +} + } diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h index ab930a50c7a2cc..7bdda8e8a28cb0 100644 --- a/be/src/runtime/tuple.h +++ b/be/src/runtime/tuple.h @@ -173,6 +173,9 @@ class Tuple { static const char* _s_llvm_class_name; void* get_data() { return this; } + + std::string to_string(const TupleDescriptor& d) const; + static std::string to_string(const Tuple* t, const TupleDescriptor& d); private: void* _data; }; diff --git a/be/src/runtime/tuple_row.cpp b/be/src/runtime/tuple_row.cpp index e422506f152998..41ceb66199755d 100644 --- a/be/src/runtime/tuple_row.cpp +++ b/be/src/runtime/tuple_row.cpp @@ -17,7 +17,24 @@ #include "runtime/tuple_row.h" +#include + namespace doris { const char* TupleRow::_s_llvm_class_name = "class.doris::TupleRow"; + +std::string TupleRow::to_string(const RowDescriptor& d) { + std::stringstream out; + out << "["; + for (int i = 0; i < d.tuple_descriptors().size(); ++i) { + if (i != 0) { + out << " "; + } + out << Tuple::to_string(get_tuple(i), *d.tuple_descriptors()[i]); + } + + out << "]"; + return out.str(); +} + } diff --git a/be/src/runtime/tuple_row.h b/be/src/runtime/tuple_row.h index bbfe0a90684618..6416b9ae406902 100644 --- a/be/src/runtime/tuple_row.h +++ b/be/src/runtime/tuple_row.h @@ -115,6 +115,7 @@ class TupleRow { // For C++/IR interop, we need to be able to look up types by name. static const char* _s_llvm_class_name; + std::string to_string(const RowDescriptor& d); private: Tuple* _tuples[1]; }; diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index e42dd67dbdebe1..63f79b607d00db 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(Service backend_options.cpp backend_service.cpp brpc_service.cpp + http_service.cpp internal_service.cpp ) diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 5e5c56f771a440..f01ce03cc2c919 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -49,6 +49,7 @@ #include "service/backend_options.h" #include "service/backend_service.h" #include "service/brpc_service.h" +#include "service/http_service.h" #include #include "common/resource_tls.h" #include "exec/schema_scanner/frontend_helper.h" @@ -142,14 +143,15 @@ int main(int argc, char** argv) { } // start backend service for the coordinator on be_port - doris::ExecEnv exec_env(paths); - exec_env.set_olap_engine(engine); + auto exec_env = doris::ExecEnv::GetInstance(); + doris::ExecEnv::init(exec_env, paths); + exec_env->set_olap_engine(engine); - doris::FrontendHelper::setup(&exec_env); + doris::FrontendHelper::setup(exec_env); doris::ThriftServer* be_server = nullptr; EXIT_IF_ERROR(doris::BackendService::create_service( - &exec_env, + exec_env, doris::config::be_port, &be_server)); Status status = be_server->start(); @@ -159,7 +161,7 @@ int main(int argc, char** argv) { exit(1); } - doris::BRpcService brpc_service(&exec_env); + doris::BRpcService brpc_service(exec_env); status = brpc_service.start(doris::config::brpc_port); if (!status.ok()) { LOG(ERROR) << "BRPC service did not start correctly, exiting"; @@ -167,18 +169,20 @@ int main(int argc, char** argv) { exit(1); } - status = exec_env.start_services(); + doris::HttpService http_service( + exec_env, doris::config::webserver_port, doris::config::webserver_num_workers); + status = http_service.start(); if (!status.ok()) { - LOG(ERROR) << "Doris Be services did not start correctly, exiting"; + LOG(ERROR) << "Doris Be http service did not start correctly, exiting"; doris::shutdown_logging(); exit(1); } - doris::TMasterInfo* master_info = exec_env.master_info(); + doris::TMasterInfo* master_info = exec_env->master_info(); // start heart beat server doris::ThriftServer* heartbeat_thrift_server; doris::AgentStatus heartbeat_status = doris::create_heartbeat_server( - &exec_env, + exec_env, doris::config::heartbeat_service_port, &heartbeat_thrift_server, doris::config::heartbeat_service_thread_count, diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp new file mode 100644 index 00000000000000..ece66e6d9c16b5 --- /dev/null +++ b/be/src/service/http_service.cpp @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/http_service.h" + +#include "http/action/checksum_action.h" +#include "http/action/health_action.h" +#include "http/action/meta_action.h" +#include "http/action/metrics_action.h" +#include "http/action/mini_load.h" +#include "http/action/pprof_actions.h" +#include "http/action/reload_tablet_action.h" +#include "http/action/restore_tablet_action.h" +#include "http/action/snapshot_action.h" +#include "http/action/stream_load.h" +#include "http/default_path_handlers.h" +#include "http/download_action.h" +#include "http/ev_http_server.h" +#include "http/http_method.h" +#include "http/monitor_action.h" +#include "http/web_page_handler.h" +#include "runtime/exec_env.h" +#include "runtime/load_path_mgr.h" +#include "util/doris_metrics.h" + +namespace doris { + +HttpService::HttpService(ExecEnv* env, int port, int num_threads) + : _env(env), + _ev_http_server(new EvHttpServer(port, num_threads)), + _web_page_handler(new WebPageHandler(_ev_http_server.get())) { +} + +HttpService::~HttpService() { +} + +Status HttpService::start() { + add_default_path_handlers(_web_page_handler.get(), _env->process_mem_tracker()); + + // register load + _ev_http_server->register_handler( + HttpMethod::PUT, "/api/{db}/{table}/_load", new MiniLoadAction(_env)); + _ev_http_server->register_handler( + HttpMethod::PUT, "/api/{db}/{table}/_stream_load", new StreamLoadAction(_env)); + + // register download action + std::vector allow_paths; + for (auto& path : _env->store_paths()) { + allow_paths.emplace_back(path.path); + } + DownloadAction* download_action = new DownloadAction(_env, allow_paths); + _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); + _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action); + + DownloadAction* tablet_download_action = new DownloadAction(_env, allow_paths); + _ev_http_server->register_handler( + HttpMethod::HEAD, "/api/_tablet/_download", tablet_download_action); + _ev_http_server->register_handler( + HttpMethod::GET, "/api/_tablet/_download", tablet_download_action); + + DownloadAction* error_log_download_action = new DownloadAction( + _env, _env->load_path_mgr()->get_load_error_file_dir()); + _ev_http_server->register_handler( + HttpMethod::GET, "/api/_load_error_log", error_log_download_action); + _ev_http_server->register_handler( + HttpMethod::HEAD, "/api/_load_error_log", error_log_download_action); + + // Register BE health action + HealthAction* health_action = new HealthAction(_env); + _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action); + + // register pprof actions + PprofActions::setup(_env, _ev_http_server.get()); + + // register metrics + { + auto action = new MetricsAction(DorisMetrics::metrics()); + _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action); + } + + MetaAction* meta_action = new MetaAction(HEADER); + _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/header/{tablet_id}/{schema_hash}", meta_action); + +#ifndef BE_TEST + // Register BE checksum action + ChecksumAction* checksum_action = new ChecksumAction(_env); + _ev_http_server->register_handler(HttpMethod::GET, "/api/checksum", checksum_action); + + // Register BE reload tablet action + ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(_env); + _ev_http_server->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action); + + RestoreTabletAction* restore_tablet_action = new RestoreTabletAction(_env); + _ev_http_server->register_handler(HttpMethod::POST, "/api/restore_tablet", restore_tablet_action); + + // Register BE snapshot action + SnapshotAction* snapshot_action = new SnapshotAction(_env); + _ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); +#endif + + RETURN_IF_ERROR(_ev_http_server->start()); + return Status::OK; +} + +} diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h new file mode 100644 index 00000000000000..f79208b0277709 --- /dev/null +++ b/be/src/service/http_service.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" + +namespace doris { + +class ExecEnv; +class EvHttpServer; +class WebPageHandler; + +// HTTP service for Doris BE +class HttpService { +public: + HttpService(ExecEnv* env, int port, int num_threads); + ~HttpService(); + + Status start(); +private: + ExecEnv* _env; + + std::unique_ptr _ev_http_server; + std::unique_ptr _web_page_handler; +}; + +} diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 13d8670522eeca..6178740c58c475 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -50,6 +50,7 @@ add_library(Util STATIC thrift_util.cpp thrift_client.cpp thrift_server.cpp + stack_util.cpp symbols_util.cpp system_metrics.cpp url_parser.cpp diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp index 1c20d8b054d908..54c92d6d2df5ba 100644 --- a/be/src/util/debug_util.cpp +++ b/be/src/util/debug_util.cpp @@ -24,10 +24,6 @@ #include "common/logging.h" #include "gen_cpp/version.h" -#include "runtime/descriptors.h" -#include "runtime/raw_value.h" -#include "runtime/tuple_row.h" -#include "runtime/row_batch.h" #include "util/cpu_info.h" #include "gen_cpp/Opcodes_types.h" #include "gen_cpp/types.pb.h" @@ -45,12 +41,6 @@ #define MILLION (THOUSAND * 1000) #define BILLION (MILLION * 1000) -namespace google { -namespace glog_internal_namespace_ { -void DumpStackTraceToString(std::string* stacktrace); -} -} - namespace doris { #define THRIFT_ENUM_OUTPUT_FN_IMPL(E, MAP) \ @@ -134,70 +124,6 @@ std::string print_plan_node_type(const TPlanNodeType::type& type) { return "Invalid plan node type"; } -std::string print_tuple(const Tuple* t, const TupleDescriptor& d) { - if (t == NULL) { - return "null"; - } - - std::stringstream out; - out << "("; - bool first_value = true; - - for (int i = 0; i < d.slots().size(); ++i) { - SlotDescriptor* slot_d = d.slots()[i]; - - if (!slot_d->is_materialized()) { - continue; - } - - if (first_value) { - first_value = false; - } else { - out << " "; - } - - if (t->is_null(slot_d->null_indicator_offset())) { - out << "null"; - } else { - std::string value_str; - RawValue::print_value( - t->get_slot(slot_d->tuple_offset()), - slot_d->type(), - -1, - &value_str); - out << value_str; - } - } - - out << ")"; - return out.str(); -} - -std::string print_row(TupleRow* row, const RowDescriptor& d) { - std::stringstream out; - out << "["; - - for (int i = 0; i < d.tuple_descriptors().size(); ++i) { - if (i != 0) { - out << " "; - } - out << print_tuple(row->get_tuple(i), *d.tuple_descriptors()[i]); - } - - out << "]"; - return out.str(); -} - -std::string print_batch(RowBatch* batch) { - std::stringstream out; - - for (int i = 0; i < batch->num_rows(); ++i) { - out << print_row(batch->get_row(i), batch->row_desc()) << "\n"; - } - - return out.str(); -} - std::string get_build_version(bool compact) { std::stringstream ss; ss << PALO_BUILD_VERSION @@ -222,12 +148,6 @@ std::string get_version_string(bool compact) { return ss.str(); } -std::string get_stack_trace() { - std::string s; - google::glog_internal_namespace_::DumpStackTraceToString(&s); - return s; -} - std::string hexdump(const char* buf, int len) { std::stringstream ss; ss << std::hex << std::uppercase; diff --git a/be/src/util/debug_util.h b/be/src/util/debug_util.h index d61c9e8404ae32..a9ba08cc790f71 100644 --- a/be/src/util/debug_util.h +++ b/be/src/util/debug_util.h @@ -32,16 +32,8 @@ namespace doris { -class RowDescriptor; -class TupleDescriptor; -class Tuple; -class TupleRow; -class RowBatch; class PUniqueId; -std::string print_tuple(const Tuple* t, const TupleDescriptor& d); -std::string print_row(TupleRow* row, const RowDescriptor& d); -std::string print_batch(RowBatch* batch); std::string print_id(const TUniqueId& id); std::string print_id(const PUniqueId& id); std::string print_plan_node_type(const TPlanNodeType::type& type); @@ -63,11 +55,6 @@ std::string get_build_version(bool compact); // Returns " version " std::string get_version_string(bool compact); -// Returns the stack trace as a string from the current location. -// Note: there is a libc bug that causes this not to work on 64 bit machines -// for recursive calls. -std::string get_stack_trace(); - std::string hexdump(const char* buf, int len); } diff --git a/be/src/util/disk_info.cpp b/be/src/util/disk_info.cpp index 420fdd487a580f..b2ac7dec2e42c9 100644 --- a/be/src/util/disk_info.cpp +++ b/be/src/util/disk_info.cpp @@ -28,8 +28,6 @@ #include #include -#include "util/debug_util.h" - namespace doris { bool DiskInfo::_s_initialized; diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index bebe630c3a937f..2950e3c4f0316d 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -27,7 +27,6 @@ #include #include -#include "util/debug_util.h" #include "util/pretty_printer.h" #include "util/string_parser.hpp" diff --git a/be/src/util/stack_util.cpp b/be/src/util/stack_util.cpp new file mode 100644 index 00000000000000..862c34ce50f1ce --- /dev/null +++ b/be/src/util/stack_util.cpp @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/stack_util.h" + +namespace google { +namespace glog_internal_namespace_ { +void DumpStackTraceToString(std::string* stacktrace); +} +} + +namespace doris { + +std::string get_stack_trace() { + std::string s; + google::glog_internal_namespace_::DumpStackTraceToString(&s); + return s; +} + +} diff --git a/be/src/util/stack_util.h b/be/src/util/stack_util.h new file mode 100644 index 00000000000000..082ab905d2f68d --- /dev/null +++ b/be/src/util/stack_util.h @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +namespace doris { + +// Returns the stack trace as a string from the current location. +// Note: there is a libc bug that causes this not to work on 64 bit machines +// for recursive calls. +std::string get_stack_trace(); + +} diff --git a/be/test/exec/olap_table_sink_test.cpp b/be/test/exec/olap_table_sink_test.cpp index 3c506b2d9ecab1..8f4bbab9c5a3df 100644 --- a/be/test/exec/olap_table_sink_test.cpp +++ b/be/test/exec/olap_table_sink_test.cpp @@ -19,16 +19,19 @@ #include +#include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" +#include "runtime/decimal_value.h" #include "runtime/exec_env.h" +#include "runtime/load_stream_mgr.h" #include "runtime/row_batch.h" -#include "runtime/tuple_row.h" #include "runtime/runtime_state.h" -#include "runtime/decimal_value.h" +#include "runtime/thread_resource_mgr.h" +#include "runtime/tuple_row.h" #include "service/brpc.h" -#include "util/descriptor_helper.h" +#include "util/brpc_stub_cache.h" #include "util/cpu_info.h" -#include "util/debug_util.h" +#include "util/descriptor_helper.h" namespace doris { namespace stream_load { @@ -41,8 +44,24 @@ class OlapTableSinkTest : public testing::Test { virtual ~OlapTableSinkTest() { } void SetUp() override { k_add_batch_status = Status::OK; + + _env._thread_mgr = new ThreadResourceMgr(); + _env._master_info = new TMasterInfo(); + _env._load_stream_mgr = new LoadStreamMgr(); + _env._brpc_stub_cache = new BrpcStubCache(); + } + void TearDown() override { + delete _env._brpc_stub_cache; + _env._brpc_stub_cache = nullptr; + delete _env._load_stream_mgr; + _env._load_stream_mgr = nullptr; + delete _env._master_info; + _env._master_info = nullptr; + delete _env._thread_mgr; + _env._thread_mgr = nullptr; } private: + ExecEnv _env; }; TDataSink get_data_sink(TDescriptorTable* desc_tbl) { @@ -281,8 +300,8 @@ class TestInternalService : public palo::PInternalService { MemTracker tracker; RowBatch batch(*_row_desc, request->row_batch(), &tracker); for (int i = 0; i < batch.num_rows(); ++i){ - LOG(INFO) << print_row(batch.get_row(i), *_row_desc); - _output_set->emplace(print_row(batch.get_row(i), *_row_desc)); + LOG(INFO) << batch.get_row(i)->to_string(*_row_desc); + _output_set->emplace(batch.get_row(i)->to_string(*_row_desc)); } } } @@ -310,11 +329,10 @@ TEST_F(OlapTableSinkTest, normal) { brpc::ServerOptions options; server->Start(4356, &options); - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -417,11 +435,10 @@ TEST_F(OlapTableSinkTest, convert) { brpc::ServerOptions options; server->Start(4356, &options); - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1024; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -545,11 +562,10 @@ TEST_F(OlapTableSinkTest, convert) { } TEST_F(OlapTableSinkTest, init_fail1) { - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -604,11 +620,10 @@ TEST_F(OlapTableSinkTest, init_fail1) { } TEST_F(OlapTableSinkTest, init_fail3) { - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -664,11 +679,10 @@ TEST_F(OlapTableSinkTest, init_fail3) { } TEST_F(OlapTableSinkTest, init_fail4) { - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -732,11 +746,10 @@ TEST_F(OlapTableSinkTest, add_batch_failed) { brpc::ServerOptions options; server->Start(4356, &options); - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -828,11 +841,10 @@ TEST_F(OlapTableSinkTest, decimal) { brpc::ServerOptions options; server->Start(4356, &options); - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index 615e10ad3908ec..e5d04313944613 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -24,9 +24,14 @@ #include #include "exec/schema_scanner/frontend_helper.h" +#include "gen_cpp/HeartbeatService_types.h" #include "http/http_channel.h" #include "http/http_request.h" #include "runtime/exec_env.h" +#include "runtime/load_stream_mgr.h" +#include "runtime/thread_resource_mgr.h" +#include "util/brpc_stub_cache.h" +#include "util/cpu_info.h" #include "util/doris_metrics.h" class mg_connection; @@ -71,14 +76,29 @@ class StreamLoadActionTest : public testing::Test { k_stream_load_plan_status = Status::OK; k_response_str = ""; config::streaming_load_max_mb = 1; + + _env._thread_mgr = new ThreadResourceMgr(); + _env._master_info = new TMasterInfo(); + _env._load_stream_mgr = new LoadStreamMgr(); + _env._brpc_stub_cache = new BrpcStubCache(); + } + void TearDown() override { + delete _env._brpc_stub_cache; + _env._brpc_stub_cache = nullptr; + delete _env._load_stream_mgr; + _env._load_stream_mgr = nullptr; + delete _env._master_info; + _env._master_info = nullptr; + delete _env._thread_mgr; + _env._thread_mgr = nullptr; } private: + ExecEnv _env; }; TEST_F(StreamLoadActionTest, no_auth) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; action.on_header(&request); @@ -92,8 +112,7 @@ TEST_F(StreamLoadActionTest, no_auth) { #if 0 TEST_F(StreamLoadActionTest, no_content_length) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&__env); HttpRequest request; request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); @@ -107,8 +126,7 @@ TEST_F(StreamLoadActionTest, no_content_length) { TEST_F(StreamLoadActionTest, unknown_encoding) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); @@ -124,8 +142,7 @@ TEST_F(StreamLoadActionTest, unknown_encoding) { TEST_F(StreamLoadActionTest, normal) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; @@ -145,8 +162,7 @@ TEST_F(StreamLoadActionTest, normal) { TEST_F(StreamLoadActionTest, put_fail) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; @@ -168,8 +184,7 @@ TEST_F(StreamLoadActionTest, put_fail) { TEST_F(StreamLoadActionTest, commit_fail) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; struct evhttp_request ev_req; @@ -189,8 +204,7 @@ TEST_F(StreamLoadActionTest, commit_fail) { TEST_F(StreamLoadActionTest, begin_fail) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; struct evhttp_request ev_req; @@ -211,8 +225,7 @@ TEST_F(StreamLoadActionTest, begin_fail) { #if 0 TEST_F(StreamLoadActionTest, receive_failed) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); @@ -228,8 +241,7 @@ TEST_F(StreamLoadActionTest, receive_failed) { TEST_F(StreamLoadActionTest, plan_fail) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; struct evhttp_request ev_req; diff --git a/env.sh b/env.sh index 9d424cff71ad90..543054552a3adb 100755 --- a/env.sh +++ b/env.sh @@ -24,7 +24,7 @@ fi # include custom environment variables if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then - source ${DORIS_HOME}/custom_env.sh + . ${DORIS_HOME}/custom_env.sh fi # set DORIS_THIRDPARTY diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index de38bf3108c703..df744b0b19a6dd 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -31,29 +31,34 @@ set -e curdir=`dirname "$0"` curdir=`cd "$curdir"; pwd` -if [ ! -f $curdir/vars.sh ]; then - echo "vars.sh is missing". - exit 1 -fi - -export DORIS_HOME=$curdir/../ -export GCC_HOME=$curdir/../palo-toolchain/gcc730 +export DORIS_HOME=$curdir/.. export TP_DIR=$curdir -source $curdir/vars.sh -cd $TP_DIR +# include custom environment variables +if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then + . ${DORIS_HOME}/custom_env.sh +fi -if [ ! -f $TP_DIR/download-thirdparty.sh ]; then +if [[ ! -f ${TP_DIR}/download-thirdparty.sh ]]; then echo "Download thirdparty script is missing". exit 1 fi -mkdir -p $TP_DIR/src -mkdir -p $TP_DIR/installed +if [ ! -f ${TP_DIR}/vars.sh ]; then + echo "vars.sh is missing". + exit 1 +fi +. ${TP_DIR}/vars.sh + +cd $TP_DIR + +# Download thirdparties. +${TP_DIR}/download-thirdparty.sh + export LD_LIBRARY_PATH=$TP_DIR/installed/lib:$LD_LIBRARY_PATH -if [ -f $DORIS_HOME/palo-toolchain/gcc730/bin/gcc ]; then - GCC_HOME=$curdir/../palo-toolchain/gcc730 +if [ -f ${DORIS_TOOLCHAIN}/gcc730/bin/gcc ]; then + GCC_HOME=${DORIS_TOOLCHAIN}/gcc730 export CC=${GCC_HOME}/bin/gcc export CPP=${GCC_HOME}/bin/cpp export CXX=${GCC_HOME}/bin/g++ @@ -63,9 +68,6 @@ else export CXX=g++ fi -# Download thirdparties. -$TP_DIR/download-thirdparty.sh $@ - check_prerequest() { local CMD=$1 local NAME=$2 @@ -241,7 +243,6 @@ build_protobuf() { ./configure --prefix=${TP_INSTALL_DIR} --disable-shared --enable-static --with-zlib=${TP_INSTALL_DIR}/include cd src sed -i 's/^AM_LDFLAGS\(.*\)$/AM_LDFLAGS\1 -all-static/' Makefile - make -j$PARALLEL cd - make -j$PARALLEL && make install } @@ -353,6 +354,7 @@ build_bzip() { check_if_source_exist $BZIP_SOURCE cd $TP_SOURCE_DIR/$BZIP_SOURCE + CFLAGS="-fPIC" make -j$PARALLEL install PREFIX=$TP_INSTALL_DIR } @@ -489,7 +491,7 @@ build_rocksdb() { cd $TP_SOURCE_DIR/$ROCKSDB_SOURCE CFLAGS="-I ${TP_INCLUDE_DIR} -I ${TP_INCLUDE_DIR}/snappy -I ${TP_INCLUDE_DIR}/lz4" CXXFLAGS="-fPIC" LDFLAGS="-static-libstdc++ -static-libgcc" \ - make -j$PARALLEL static_lib + make USE_RTTI=1 -j$PARALLEL static_lib cp librocksdb.a ../../installed/lib/librocksdb.a cp -r include/rocksdb ../../installed/include/ } diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh index dff1a89a510468..772dfb48c793be 100755 --- a/thirdparty/download-thirdparty.sh +++ b/thirdparty/download-thirdparty.sh @@ -26,13 +26,28 @@ set -e curdir=`dirname "$0"` curdir=`cd "$curdir"; pwd` -REPOSITORY_URL=$1 -export DORIS_HOME=$curdir/../ -source $curdir/vars.sh +if [[ -z "${DORIS_HOME}" ]]; then + DORIS_HOME=$curdir/.. +fi + +# include custom environment variables +if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then + . ${DORIS_HOME}/custom_env.sh +fi + +if [[ -z "${TP_DIR}" ]]; then + TP_DIR=$curdir +fi + +if [ ! -f ${TP_DIR}/vars.sh ]; then + echo "vars.sh is missing". + exit 1 +fi +. ${TP_DIR}/vars.sh -mkdir -p $TP_DIR/src -mkdir -p $TP_DIR/installed +mkdir -p ${TP_DIR}/src +mkdir -p ${TP_DIR}/installed download() { local FILENAME=$1 @@ -230,5 +245,5 @@ if [ ! -f $PATCHED_MARK ]; then touch $PATCHED_MARK fi cd - -echo "Finished patching $LZ4_SOURCE" +echo "Finished patching $BRPC_SOURCE"