From adf58744735f76f6d6018281f3b132639695d92b Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:25:08 +0800 Subject: [PATCH 01/19] Open --- be/src/common/config.h | 17 +- be/src/service/internal_service.cpp | 854 +++++++++++++++------------ be/src/service/internal_service.h | 9 +- be/src/util/doris_metrics.cpp | 64 ++ be/src/util/doris_metrics.h | 43 ++ be/src/util/priority_thread_pool.hpp | 6 +- 6 files changed, 612 insertions(+), 381 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 100664fb978130..82addf0086b05a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -35,8 +35,10 @@ CONF_Int32(be_port, "9060"); // port for brpc CONF_Int32(brpc_port, "8060"); -// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores -CONF_Int32(brpc_num_threads, "-1"); +// the number of bthreads for brpc, the default value is set to 32 +// brpc only for network service send or accept request +// no more process any logic +CONF_Int32(brpc_num_threads, "32"); // port to brpc server for single replica load CONF_Int32(single_replica_load_brpc_port, "8070"); @@ -385,8 +387,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64"); CONF_Int64(load_data_reserve_hours, "4"); // log error log will be removed after this time CONF_mInt64(load_error_log_reserve_hours, "48"); -CONF_Int32(number_tablet_writer_threads, "16"); -CONF_Int32(number_slave_replica_download_threads, "64"); + +// be brpc interface is classified into two categories: light and heavy +// each category has diffrent thread number +// threads to handle heavy api interface, such as transmit_data/transmit_block etc +CONF_Int32(brpc_heavy_work_pool_threads, "192"); +// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start +CONF_Int32(brpc_light_work_pool_threads, "32"); +CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240"); +CONF_Int32(brpc_light_work_pool_max_queue_size, "10240"); // The maximum amount of data that can be processed by a stream load CONF_mInt64(streaming_load_max_mb, "10240"); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 68718d395628a0..6667bb0da39f19 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -70,7 +70,15 @@ using namespace ErrorCode; const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, MetricUnit::NOUNIT); + +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); bthread_key_t btls_key; @@ -104,32 +112,58 @@ class NewHttpClosure : public ::google::protobuf::Closure { PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), - _tablet_worker_pool(config::number_tablet_writer_threads, 10240, "tablet_writer"), - _slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240, - "replica_download") { - REGISTER_HOOK_METRIC(add_batch_task_queue_size, - [this]() { return _tablet_worker_pool.get_queue_size(); }); + _heavy_work_pool(config::brpc_heavy_work_pool_threads, + config::brpc_heavy_work_pool_max_queue_size, "brpc_heavy"), + _light_work_pool(config::brpc_light_work_pool_threads, + config::brpc_light_work_pool_max_queue_size, "brpc_light") { + REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, + [this]() { return _heavy_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(light_work_pool_queue_size, + [this]() { return _light_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(heavy_work_active_threads, + [this]() { return _heavy_work_pool.get_active_threads(); }); + REGISTER_HOOK_METRIC(light_work_active_threads, + [this]() { return _light_work_pool.get_active_threads(); }); + + REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size, + []() { return config::brpc_heavy_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(light_work_pool_max_queue_size, + []() { return config::brpc_light_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(heavy_work_max_threads, + []() { return config::brpc_heavy_work_pool_threads; }); + REGISTER_HOOK_METRIC(light_work_max_threads, + []() { return config::brpc_light_work_pool_threads; }); + CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, AsyncIO::io_ctx_key_deleter)); } PInternalServiceImpl::~PInternalServiceImpl() { - DEREGISTER_HOOK_METRIC(add_batch_task_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_active_threads); + DEREGISTER_HOOK_METRIC(light_work_active_threads); + + DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_max_threads); + DEREGISTER_HOOK_METRIC(light_work_max_threads); + CHECK_EQ(0, bthread_key_delete(btls_key)); CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); } -void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) {} -void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) {} -void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -139,22 +173,25 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() - << ", txn_id=" << request->txn_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->open(*request); - if (!st.ok()) { - LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() - << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); - } - st.to_protobuf(response->mutable_status()); + _light_work_pool.offer([this, request, response, done]() { + VLOG_RPC << "tablet writer open, id=" << request->id() + << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->open(*request); + if (!st.ok()) { + LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() + << ", index_id=" << request->index_id() + << ", txn_id=" << request->txn_id(); + } + st.to_protobuf(response->mutable_status()); + }); } -void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment", cntl_base); + auto span = telemetry::start_rpc_server_span("exec_plan_fragment", controller); auto scope = OpentelemetryScope {span}; brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); @@ -168,67 +205,75 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - exec_plan_fragment(cntl_base, request, response, done); + DorisMetrics::instance()->exec_plan_fragment_prepare->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + exec_plan_fragment(controller, request, response, done); + }); } void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->fragment_mgr()->start_query_execution(request); - st.to_protobuf(result->mutable_status()); + DorisMetrics::instance()->exec_plan_fragment_start->increment(1); + _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->start_query_execution(request); + st.to_protobuf(result->mutable_status()); + }); } -void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure(done); - brpc::Controller* cntl = static_cast(cntl_base); - attachment_transfer_request_block(request, cntl); - - _tablet_writer_add_block(cntl_base, request, response, new_done); + DorisMetrics::instance()->tablet_writer_add_block->increment(1); + _heavy_work_pool.offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure(done); + brpc::Controller* cntl = static_cast(controller); + attachment_transfer_request_block(request, cntl); + + _tablet_writer_add_block(controller, request, response, new_done); + }); } void PInternalServiceImpl::tablet_writer_add_block_by_http( - google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, + google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); - google::protobuf::Closure* new_done = - new NewHttpClosure(new_request, done); - brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_block(new_request, - cntl); - if (st.ok()) { - _tablet_writer_add_block(cntl_base, new_request, response, new_done); - } else { - st.to_protobuf(response->mutable_status()); - } + DorisMetrics::instance()->tablet_writer_add_block_by_http->increment(1); + _heavy_work_pool.offer([this, controller, response, done]() { + PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); + google::protobuf::Closure* new_done = + new NewHttpClosure(new_request, done); + brpc::Controller* cntl = static_cast(controller); + Status st = attachment_extract_request_contain_block( + new_request, cntl); + if (st.ok()) { + _tablet_writer_add_block(controller, new_request, response, new_done); + } else { + st.to_protobuf(response->mutable_status()); + } + }); } -void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer add block, id=" << request->id() - << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() - << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); int64_t submit_task_time_ns = MonotonicNanos(); - _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, this]() { + _heavy_work_pool.offer([request, response, done, submit_task_time_ns, this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { LOG(WARNING) << "tablet writer add block failed, message=" << st @@ -247,15 +292,18 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->cancel(*request); - if (!st.ok()) { - LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() - << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); - } + DorisMetrics::instance()->tablet_writer_cancel->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + VLOG_RPC << "tablet writer cancel, id=" << request->id() + << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->cancel(*request); + if (!st.ok()) { + LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() + << ", index_id=" << request->index_id() + << ", sender_id=" << request->sender_id(); + } + }); } Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, @@ -286,125 +334,134 @@ Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, } } -void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* controller, const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - TUniqueId tid; - tid.__set_hi(request->finst_id().hi()); - tid.__set_lo(request->finst_id().lo()); - - Status st = Status::OK(); - if (request->has_cancel_reason()) { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) - << ", reason: " << request->cancel_reason(); - _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); - } else { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); - _exec_env->fragment_mgr()->cancel(tid); - } - - // TODO: the logic seems useless, cancel only return Status::OK. remove it - st.to_protobuf(result->mutable_status()); + DorisMetrics::instance()->cancel_plan_fragment->increment(1); + _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + TUniqueId tid; + tid.__set_hi(request->finst_id().hi()); + tid.__set_lo(request->finst_id().lo()); + + Status st = Status::OK(); + if (request->has_cancel_reason()) { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) + << ", reason: " << request->cancel_reason(); + _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + } else { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); + _exec_env->fragment_mgr()->cancel(tid); + } + // TODO: the logic seems useless, cancel only return Status::OK. remove it + st.to_protobuf(result->mutable_status()); + }); } -void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - brpc::Controller* cntl = static_cast(cntl_base); - GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); - _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + DorisMetrics::instance()->fetch_data->increment(1); + _heavy_work_pool.offer([this, controller, request, result, done]() { + brpc::Controller* cntl = static_cast(controller); + GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + }); } void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, google::protobuf::Closure* done) { - VLOG_RPC << "fetch table schema"; - brpc::ClosureGuard closure_guard(done); - TFileScanRange file_scan_range; - Status st = Status::OK(); - { - const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); - uint32_t len = request->file_scan_range().size(); - st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + DorisMetrics::instance()->fetch_table_schema->increment(1); + _light_work_pool.offer([this, controller, request, result, done]() { + VLOG_RPC << "fetch table schema"; + brpc::ClosureGuard closure_guard(done); + TFileScanRange file_scan_range; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); + uint32_t len = request->file_scan_range().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + if (file_scan_range.__isset.ranges == false) { + st = Status::InternalError("can not get TFileRangeDesc."); + st.to_protobuf(result->mutable_status()); + return; + } + if (file_scan_range.__isset.params == false) { + st = Status::InternalError("can not get TFileScanRangeParams."); + st.to_protobuf(result->mutable_status()); + return; + } + const TFileRangeDesc& range = file_scan_range.ranges.at(0); + const TFileScanRangeParams& params = file_scan_range.params; + + std::unique_ptr reader(nullptr); + std::unique_ptr profile(new RuntimeProfile("FetchTableSchema")); + IOContext io_ctx; + FileCacheStatistics file_cache_statis; + io_ctx.file_cache_stats = &file_cache_statis; + switch (params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_DEFLATE: { + // file_slots is no use + std::vector file_slots; + reader.reset( + new vectorized::CsvReader(profile.get(), params, range, file_slots, &io_ctx)); + break; + } + case TFileFormatType::FORMAT_PARQUET: { + reader.reset(new vectorized::ParquetReader(params, range, &io_ctx)); + break; + } + case TFileFormatType::FORMAT_ORC: { + std::vector column_names; + reader.reset(new vectorized::OrcReader(params, range, column_names, "", &io_ctx)); + break; + } + case TFileFormatType::FORMAT_JSON: { + std::vector file_slots; + reader.reset(new vectorized::NewJsonReader(profile.get(), params, range, file_slots, + &io_ctx)); + break; + } + default: + st = Status::InternalError("Not supported file format in fetch table schema: {}", + params.format_type); + st.to_protobuf(result->mutable_status()); + return; + } + std::vector col_names; + std::vector col_types; + st = reader->get_parsed_schema(&col_names, &col_types); if (!st.ok()) { LOG(WARNING) << "fetch table schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; } - } - if (file_scan_range.__isset.ranges == false) { - st = Status::InternalError("can not get TFileRangeDesc."); - st.to_protobuf(result->mutable_status()); - return; - } - if (file_scan_range.__isset.params == false) { - st = Status::InternalError("can not get TFileScanRangeParams."); - st.to_protobuf(result->mutable_status()); - return; - } - const TFileRangeDesc& range = file_scan_range.ranges.at(0); - const TFileScanRangeParams& params = file_scan_range.params; - - std::unique_ptr reader(nullptr); - std::unique_ptr profile(new RuntimeProfile("FetchTableSchema")); - IOContext io_ctx; - FileCacheStatistics file_cache_statis; - io_ctx.file_cache_stats = &file_cache_statis; - switch (params.format_type) { - case TFileFormatType::FORMAT_CSV_PLAIN: - case TFileFormatType::FORMAT_CSV_GZ: - case TFileFormatType::FORMAT_CSV_BZ2: - case TFileFormatType::FORMAT_CSV_LZ4FRAME: - case TFileFormatType::FORMAT_CSV_LZOP: - case TFileFormatType::FORMAT_CSV_DEFLATE: { - // file_slots is no use - std::vector file_slots; - reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots, &io_ctx)); - break; - } - case TFileFormatType::FORMAT_PARQUET: { - reader.reset(new vectorized::ParquetReader(params, range, &io_ctx)); - break; - } - case TFileFormatType::FORMAT_ORC: { - std::vector column_names; - reader.reset(new vectorized::OrcReader(params, range, column_names, "", &io_ctx)); - break; - } - case TFileFormatType::FORMAT_JSON: { - std::vector file_slots; - reader.reset( - new vectorized::NewJsonReader(profile.get(), params, range, file_slots, &io_ctx)); - break; - } - default: - st = Status::InternalError("Not supported file format in fetch table schema: {}", - params.format_type); - st.to_protobuf(result->mutable_status()); - return; - } - std::vector col_names; - std::vector col_types; - st = reader->get_parsed_schema(&col_names, &col_types); - if (!st.ok()) { - LOG(WARNING) << "fetch table schema failed, errmsg=" << st; + result->set_column_nums(col_names.size()); + for (size_t idx = 0; idx < col_names.size(); ++idx) { + result->add_column_names(col_names[idx]); + } + for (size_t idx = 0; idx < col_types.size(); ++idx) { + PTypeDesc* type_desc = result->add_column_types(); + col_types[idx].to_protobuf(type_desc); + } st.to_protobuf(result->mutable_status()); - return; - } - result->set_column_nums(col_names.size()); - for (size_t idx = 0; idx < col_names.size(); ++idx) { - result->add_column_names(col_names[idx]); - } - for (size_t idx = 0; idx < col_types.size(); ++idx) { - PTypeDesc* type_desc = result->add_column_types(); - col_types[idx].to_protobuf(type_desc); - } - st.to_protobuf(result->mutable_status()); + }); } Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request, @@ -423,200 +480,233 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response, google::protobuf::Closure* done) { - [[maybe_unused]] brpc::Controller* cntl = static_cast(controller); - brpc::ClosureGuard guard(done); - Status st = _tablet_fetch_data(request, response); - st.to_protobuf(response->mutable_status()); + DorisMetrics::instance()->tablet_fetch_data->increment(1); + _heavy_work_pool.offer([this, controller, request, response, done]() { + [[maybe_unused]] brpc::Controller* cntl = static_cast(controller); + brpc::ClosureGuard guard(done); + Status st = _tablet_fetch_data(request, response); + st.to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - // PProxyRequest is defined in gensrc/proto/internal_service.proto - // Currently it supports 2 kinds of requests: - // 1. get all kafka partition ids for given topic - // 2. get all kafka partition offsets for given topic and timestamp. - if (request->has_kafka_meta_request()) { - const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.partition_id_for_latest_offsets().empty()) { - // get latest offsets for specified partition ids - std::vector partition_offsets; - Status st = _exec_env->routine_load_task_executor() - ->get_kafka_latest_offsets_for_partitions( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + DorisMetrics::instance()->get_info->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + // PProxyRequest is defined in gensrc/proto/internal_service.proto + // Currently it supports 2 kinds of requests: + // 1. get all kafka partition ids for given topic + // 2. get all kafka partition offsets for given topic and timestamp. + if (request->has_kafka_meta_request()) { + const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); + if (!kafka_request.partition_id_for_latest_offsets().empty()) { + // get latest offsets for specified partition ids + std::vector partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_latest_offsets_for_partitions( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else if (!kafka_request.offset_times().empty()) { - // if offset_times() has elements, which means this request is to get offset by timestamp. - std::vector partition_offsets; - Status st = - _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + st.to_protobuf(response->mutable_status()); + return; + } else if (!kafka_request.offset_times().empty()) { + // if offset_times() has elements, which means this request is to get offset by timestamp. + std::vector partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_partition_offsets_for_times( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else { - // get partition ids of topic - std::vector partition_ids; - Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( - request->kafka_meta_request(), &partition_ids); - if (st.ok()) { - PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); - for (int32_t id : partition_ids) { - kafka_result->add_partition_ids(id); + st.to_protobuf(response->mutable_status()); + return; + } else { + // get partition ids of topic + std::vector partition_ids; + Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( + request->kafka_meta_request(), &partition_ids); + if (st.ok()) { + PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); + for (int32_t id : partition_ids) { + kafka_result->add_partition_ids(id); + } } + st.to_protobuf(response->mutable_status()); + return; } - st.to_protobuf(response->mutable_status()); - return; } - } - Status::OK().to_protobuf(response->mutable_status()); + Status::OK().to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->update(request, response); + DorisMetrics::instance()->update_cache->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->update(request, response); + }); } void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->fetch(request, result); + DorisMetrics::instance()->fetch_cache->increment(1); + _heavy_work_pool.offer([this, controller, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->fetch(request, result); + }); } void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->clear(request, response); + DorisMetrics::instance()->clear_cache->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->clear(request, response); + }); } void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller, const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "merge meet error" << st.to_string(); - } - st.to_protobuf(response->mutable_status()); + DorisMetrics::instance()->merge_filter->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "merge meet error" << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller, const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - UniqueId unique_id(request->query_id()); - VLOG_NOTICE << "rpc apply_filter recv"; - Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "apply filter meet error: " << st.to_string(); - } - st.to_protobuf(response->mutable_status()); + DorisMetrics::instance()->apply_filter->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + UniqueId unique_id(request->query_id()); + VLOG_NOTICE << "rpc apply_filter recv"; + Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "apply filter meet error: " << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - for (int i = 0; i < request->data_size(); ++i) { - PDataRow* row = new PDataRow(); - row->CopyFrom(request->data(i)); - pipe->append_and_flush(reinterpret_cast(&row), sizeof(row), - sizeof(row) + row->ByteSizeLong()); + DorisMetrics::instance()->send_data->increment(1); + _heavy_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + for (int i = 0; i < request->data_size(); ++i) { + PDataRow* row = new PDataRow(); + row->CopyFrom(request->data(i)); + pipe->append_and_flush(reinterpret_cast(&row), sizeof(row), + sizeof(row) + row->ByteSizeLong()); + } + response->mutable_status()->set_status_code(0); } - response->mutable_status()->set_status_code(0); - } + }); } void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->finish(); - response->mutable_status()->set_status_code(0); - } + DorisMetrics::instance()->commit->increment(1); + _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->finish(); + response->mutable_status()->set_status_code(0); + } + }); } void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->cancel("rollback"); - response->mutable_status()->set_status_code(0); - } + DorisMetrics::instance()->rollback->increment(1); + _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->cancel("rollback"); + response->mutable_status()->set_status_code(0); + } + }); } -void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* controller, const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - brpc::Controller* cntl = static_cast(cntl_base); + DorisMetrics::instance()->fold_constant_expr->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + brpc::Controller* cntl = static_cast(controller); - Status st = Status::OK(); - if (request->has_request()) { - st = _fold_constant_expr(request->request(), response); - } else { - // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 - st = _fold_constant_expr(cntl->request_attachment().to_string(), response); - } - if (!st.ok()) { - LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; - } - st.to_protobuf(response->mutable_status()); + Status st = Status::OK(); + if (request->has_request()) { + st = _fold_constant_expr(request->request(), response); + } else { + // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 + st = _fold_constant_expr(cntl->request_attachment().to_string(), response); + } + if (!st.ok()) { + LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; + } + st.to_protobuf(response->mutable_status()); + }); } Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, @@ -631,31 +721,38 @@ Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, return FoldConstantExecutor().fold_constant_vexpr(t_request, response); } -void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure(done); - brpc::Controller* cntl = static_cast(cntl_base); - attachment_transfer_request_block(request, cntl); - - _transmit_block(cntl_base, request, response, new_done, Status::OK()); + DorisMetrics::instance()->transmit_block->increment(1); + _heavy_work_pool.offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure(done); + brpc::Controller* cntl = static_cast(controller); + attachment_transfer_request_block(request, cntl); + + _transmit_block(controller, request, response, new_done, Status::OK()); + }); } -void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - PTransmitDataParams* new_request = new PTransmitDataParams(); - google::protobuf::Closure* new_done = - new NewHttpClosure(new_request, done); - brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_block(new_request, cntl); - _transmit_block(cntl_base, new_request, response, new_done, st); + DorisMetrics::instance()->transmit_block_by_http->increment(1); + _heavy_work_pool.offer([this, controller, response, done]() { + PTransmitDataParams* new_request = new PTransmitDataParams(); + google::protobuf::Closure* new_done = + new NewHttpClosure(new_request, done); + brpc::Controller* cntl = static_cast(controller); + Status st = + attachment_extract_request_contain_block(new_request, cntl); + _transmit_block(controller, new_request, response, new_done, st); + }); } -void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -693,75 +790,86 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->data().size() != request->size()) { - std::stringstream ss; - ss << "data size not same, expected: " << request->size() - << ", actual: " << request->data().size(); - response->mutable_status()->add_error_msgs(ss.str()); - response->mutable_status()->set_status_code(1); - - } else { - Md5Digest digest; - digest.update(static_cast(request->data().c_str()), request->data().size()); - digest.digest(); - if (!iequal(digest.hex(), request->md5())) { + DorisMetrics::instance()->check_rpc_channel->increment(1); + _light_work_pool.offer([controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->data().size() != request->size()) { std::stringstream ss; - ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + ss << "data size not same, expected: " << request->size() + << ", actual: " << request->data().size(); response->mutable_status()->add_error_msgs(ss.str()); response->mutable_status()->set_status_code(1); + + } else { + Md5Digest digest; + digest.update(static_cast(request->data().c_str()), + request->data().size()); + digest.digest(); + if (!iequal(digest.hex(), request->md5())) { + std::stringstream ss; + ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + response->mutable_status()->add_error_msgs(ss.str()); + response->mutable_status()->set_status_code(1); + } } - } + }); } void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* controller, const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->all()) { - int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); - if (size > 0) { - std::vector endpoints; - ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); - ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); - *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; - } - } else { - for (const std::string& endpoint : request->endpoints()) { - if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { - response->mutable_status()->add_error_msgs(endpoint + ": not found."); - continue; + DorisMetrics::instance()->reset_rpc_channel->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->all()) { + int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); + if (size > 0) { + std::vector endpoints; + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); + ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); + *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; } + } else { + for (const std::string& endpoint : request->endpoints()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { + response->mutable_status()->add_error_msgs(endpoint + ": not found."); + continue; + } - if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { - response->add_channels(endpoint); - } else { - response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { + response->add_channels(endpoint); + } else { + response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + } + } + if (request->endpoints_size() != response->channels_size()) { + response->mutable_status()->set_status_code(1); } } - if (request->endpoints_size() != response->channels_size()) { - response->mutable_status()->set_status_code(1); - } - } + }); } -void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - if (request->has_hello()) { - response->set_hello(request->hello()); - } - response->mutable_status()->set_status_code(0); + DorisMetrics::instance()->hand_shake->increment(1); + _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + if (request->has_hello()) { + response->set_hello(request->hello()); + } + response->mutable_status()->set_status_code(0); + }); } void PInternalServiceImpl::request_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveRequest* request, PTabletWriteSlaveResult* response, google::protobuf::Closure* done) { + DorisMetrics::instance()->request_slave_tablet_pull_rowset->increment(1); brpc::ClosureGuard closure_guard(done); RowsetMetaPB rowset_meta_pb = request->rowset_meta(); std::string rowset_path = request->rowset_path(); @@ -771,8 +879,8 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( int64_t brpc_port = request->brpc_port(); std::string token = request->token(); int64_t node_id = request->node_id(); - _slave_replica_worker_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, - http_port, token, rowset_path, this]() { + _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, http_port, + token, rowset_path, this]() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash()); if (tablet == nullptr) { @@ -971,14 +1079,17 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote void PInternalServiceImpl::response_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - VLOG_CRITICAL - << "receive the result of slave replica pull rowset from slave replica. slave server=" - << request->node_id() << ", is_succeed=" << request->is_succeed() - << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); - StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( - request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); - Status::OK().to_protobuf(response->mutable_status()); + DorisMetrics::instance()->response_slave_tablet_pull_rowset->increment(1); + _heavy_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. " + "slave server=" + << request->node_id() << ", is_succeed=" << request->is_succeed() + << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); + StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( + request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); + Status::OK().to_protobuf(response->mutable_status()); + }); } static Status read_by_rowids( @@ -1097,9 +1208,8 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) { - // Submit task to seperate ThreadPool for avoiding block bthread working pthread - ThreadPool* task_pool = StorageEngine::instance()->get_bg_multiget_threadpool(); - Status submit_st = task_pool->submit_func([request, response, done, this]() { + DorisMetrics::instance()->multiget_data->increment(1); + _light_work_pool.offer([request, response, done, this]() { // multi get data by rowid MonotonicStopWatch watch; watch.start(); @@ -1109,10 +1219,6 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro st.to_protobuf(response->mutable_status()); LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; }); - if (!submit_st.ok()) { - submit_st.to_protobuf(response->mutable_status()); - done->Run(); - } } } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 103293a7454b81..4c500a245f4edc 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -190,8 +190,13 @@ class PInternalServiceImpl : public PBackendService { private: ExecEnv* _exec_env; - PriorityThreadPool _tablet_worker_pool; - PriorityThreadPool _slave_replica_worker_pool; + + // every brpc service request should put into thread pool + // the reason see issue #16634 + // define the interface for reading and writing data as heavy interface + // otherwise as light interface + PriorityThreadPool _heavy_work_pool; + PriorityThreadPool _light_work_pool; }; } // namespace doris diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 5734e6a26ac8d1..662f1b5eca3cf6 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -29,6 +29,39 @@ namespace doris { DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fragment_requests_total, MetricUnit::REQUESTS, "Total fragment requests received."); + +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(transmit_data, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(transmit_data_by_http, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(exec_plan_fragment, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(exec_plan_fragment_prepare, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(exec_plan_fragment_start, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(cancel_plan_fragment, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fetch_data, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fetch_table_schema, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_writer_open, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_writer_add_block, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_writer_add_block_by_http, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_writer_cancel, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(get_info, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(update_cache, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fetch_cache, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(clear_cache, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(merge_filter, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(apply_filter, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(transmit_block, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(transmit_block_by_http, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(send_data, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(commit, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(rollback, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fold_constant_expr, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(check_rpc_channel, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(reset_rpc_channel, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(hand_shake, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(request_slave_tablet_pull_rowset, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(response_slave_tablet_pull_rowset, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(multiget_data, MetricUnit::REQUESTS, ""); +DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_fetch_data, MetricUnit::REQUESTS, ""); + DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(fragment_request_duration_us, MetricUnit::MICROSECONDS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes, MetricUnit::BYTES); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_rows, MetricUnit::ROWS); @@ -197,6 +230,37 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { _server_metric_entity = _metric_registry.register_entity("server"); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_data); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_data_by_http); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, exec_plan_fragment); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, exec_plan_fragment_prepare); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, exec_plan_fragment_start); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cancel_plan_fragment); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_data); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_table_schema); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_open); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_add_block); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_add_block_by_http); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_cancel); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, get_info); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, update_cache); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_cache); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clear_cache); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, merge_filter); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, apply_filter); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_block); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_block_by_http); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, send_data); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, commit); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, rollback); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fold_constant_expr); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, check_rpc_channel); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, reset_rpc_channel); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, hand_shake); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, request_slave_tablet_pull_rowset); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, response_slave_tablet_pull_rowset); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, multiget_data); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_fetch_data); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 87afed763b0f69..e3331984e7a987 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -45,6 +45,39 @@ namespace doris { class DorisMetrics { public: IntCounter* fragment_requests_total; + + IntCounter* transmit_data; + IntCounter* transmit_data_by_http; + IntCounter* exec_plan_fragment; + IntCounter* exec_plan_fragment_prepare; + IntCounter* exec_plan_fragment_start; + IntCounter* cancel_plan_fragment; + IntCounter* fetch_data; + IntCounter* fetch_table_schema; + IntCounter* tablet_writer_open; + IntCounter* tablet_writer_add_block; + IntCounter* tablet_writer_add_block_by_http; + IntCounter* tablet_writer_cancel; + IntCounter* get_info; + IntCounter* update_cache; + IntCounter* fetch_cache; + IntCounter* clear_cache; + IntCounter* merge_filter; + IntCounter* apply_filter; + IntCounter* transmit_block; + IntCounter* transmit_block_by_http; + IntCounter* send_data; + IntCounter* commit; + IntCounter* rollback; + IntCounter* fold_constant_expr; + IntCounter* check_rpc_channel; + IntCounter* reset_rpc_channel; + IntCounter* hand_shake; + IntCounter* request_slave_tablet_pull_rowset; + IntCounter* response_slave_tablet_pull_rowset; + IntCounter* multiget_data; + IntCounter* tablet_fetch_data; + IntCounter* fragment_request_duration_us; IntCounter* query_scan_bytes; IntCounter* query_scan_rows; @@ -222,6 +255,16 @@ class DorisMetrics { IntCounter* upload_rowset_count; IntCounter* upload_fail_count; + UIntGauge* light_work_pool_queue_size; + UIntGauge* heavy_work_pool_queue_size; + UIntGauge* heavy_work_active_threads; + UIntGauge* light_work_active_threads; + + UIntGauge* heavy_work_pool_max_queue_size; + UIntGauge* light_work_pool_max_queue_size; + UIntGauge* heavy_work_max_threads; + UIntGauge* light_work_max_threads; + static DorisMetrics* instance() { static DorisMetrics instance; return &instance; diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 3bbc53788cb37f..7e2b8b5f77d993 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -55,7 +55,7 @@ class PriorityThreadPool { // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) - : _work_queue(queue_size), _shutdown(false), _name(name) { + : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( std::bind(std::mem_fn(&PriorityThreadPool::work_thread), this, i)); @@ -101,6 +101,7 @@ class PriorityThreadPool { virtual void join() { _threads.join_all(); } virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } + virtual uint32_t get_active_threads() const { return _active_threads; } // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. @@ -136,7 +137,9 @@ class PriorityThreadPool { while (!is_shutdown()) { Task task; if (_work_queue.blocking_get(&task)) { + _active_threads++; task.work_function(); + _active_threads--; } if (_work_queue.get_size() == 0) { _empty_cv.notify_all(); @@ -151,6 +154,7 @@ class PriorityThreadPool { // Set to true when threads should stop doing work and terminate. std::atomic _shutdown; std::string _name; + std::atomic _active_threads; }; } // namespace doris From 400a1666648e0b2334178f9a8440008f377f8eb3 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:32:53 +0800 Subject: [PATCH 02/19] Update be/src/service/internal_service.cpp Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 6667bb0da39f19..77e65c77f2ae2b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -576,7 +576,7 @@ void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controll const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->clear_cache->increment(1); - _light_work_pool.offer([this, controller, request, response, done]() { + _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); }); From 481dbc7714f27861dec72ca1d20f4e385feee141 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:33:08 +0800 Subject: [PATCH 03/19] Update be/src/service/internal_service.cpp Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 77e65c77f2ae2b..822dc171492f52 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -821,7 +821,7 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co PResetRPCChannelResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->reset_rpc_channel->increment(1); - _light_work_pool.offer([this, controller, request, response, done]() { + _light_work_pool.offer([ controller, request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { From d0b02cde815a8dfbb27d129b2e642c7e2e781bec Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:33:15 +0800 Subject: [PATCH 04/19] Update be/src/service/internal_service.cpp Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 822dc171492f52..d34e33aa74f83d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -857,7 +857,7 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controlle PHandShakeResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->hand_shake->increment(1); - _light_work_pool.offer([this, request, response, done]() { + _light_work_pool.offer([ request, response, done]() { brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { response->set_hello(request->hello()); From a72dbb37095ae4d6ab79f66c19aceab872aa461f Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:33:26 +0800 Subject: [PATCH 05/19] Update be/src/service/internal_service.cpp Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d34e33aa74f83d..d7647b9b951da3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -791,7 +791,7 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->check_rpc_channel->increment(1); - _light_work_pool.offer([controller, request, response, done]() { + _light_work_pool.offer([ request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->data().size() != request->size()) { From 54f167d90819f1d636e33d7962063df14140d6b0 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:33:34 +0800 Subject: [PATCH 06/19] Update be/src/service/internal_service.cpp Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d7647b9b951da3..a01ce1b3f91972 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -493,7 +493,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { DorisMetrics::instance()->get_info->increment(1); - _light_work_pool.offer([this, controller, request, response, done]() { + _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: From 31d3928dcf719dd2a472e8d27e878ccf84297bab Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:33:41 +0800 Subject: [PATCH 07/19] Update be/src/service/internal_service.cpp Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a01ce1b3f91972..f51d9668cc52e5 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -377,7 +377,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c PFetchTableSchemaResult* result, google::protobuf::Closure* done) { DorisMetrics::instance()->fetch_table_schema->increment(1); - _light_work_pool.offer([this, controller, request, result, done]() { + _light_work_pool.offer([this, request, result, done]() { VLOG_RPC << "fetch table schema"; brpc::ClosureGuard closure_guard(done); TFileScanRange file_scan_range; From 77586a25347d1c898d52575d6a4722d8fd66330d Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:34:06 +0800 Subject: [PATCH 08/19] Update be/src/service/internal_service.cpp Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index f51d9668cc52e5..70c4ed88843b08 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -293,7 +293,7 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* PTabletWriterCancelResult* response, google::protobuf::Closure* done) { DorisMetrics::instance()->tablet_writer_cancel->increment(1); - _light_work_pool.offer([this, controller, request, response, done]() { + _light_work_pool.offer([this, controller, request, done]() { VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); brpc::ClosureGuard closure_guard(done); From 56791c2a6d6b57b090e8e9909f1290d9bc29554b Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sat, 11 Feb 2023 23:35:37 +0800 Subject: [PATCH 09/19] fix --- be/src/service/internal_service.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 70c4ed88843b08..e0dcbe43766280 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -791,7 +791,7 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->check_rpc_channel->increment(1); - _light_work_pool.offer([ request, response, done]() { + _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->data().size() != request->size()) { @@ -821,7 +821,7 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co PResetRPCChannelResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->reset_rpc_channel->increment(1); - _light_work_pool.offer([ controller, request, response, done]() { + _light_work_pool.offer([controller, request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { @@ -857,7 +857,7 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controlle PHandShakeResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->hand_shake->increment(1); - _light_work_pool.offer([ request, response, done]() { + _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { response->set_hello(request->hello()); From ab9b22ddedaa5a35b5a6ded413142f7b9c78f4ad Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Sun, 12 Feb 2023 00:09:09 +0800 Subject: [PATCH 10/19] fix --- be/src/service/internal_service.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index e0dcbe43766280..b6710332d39f09 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -293,7 +293,7 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* PTabletWriterCancelResult* response, google::protobuf::Closure* done) { DorisMetrics::instance()->tablet_writer_cancel->increment(1); - _light_work_pool.offer([this, controller, request, done]() { + _light_work_pool.offer([this, request, done]() { VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); brpc::ClosureGuard closure_guard(done); @@ -377,7 +377,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c PFetchTableSchemaResult* result, google::protobuf::Closure* done) { DorisMetrics::instance()->fetch_table_schema->increment(1); - _light_work_pool.offer([this, request, result, done]() { + _light_work_pool.offer([request, result, done]() { VLOG_RPC << "fetch table schema"; brpc::ClosureGuard closure_guard(done); TFileScanRange file_scan_range; @@ -556,7 +556,7 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->update_cache->increment(1); - _light_work_pool.offer([this, controller, request, response, done]() { + _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); }); @@ -566,7 +566,7 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controll const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { DorisMetrics::instance()->fetch_cache->increment(1); - _heavy_work_pool.offer([this, controller, request, result, done]() { + _heavy_work_pool.offer([this, request, result, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); }); @@ -821,7 +821,7 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co PResetRPCChannelResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->reset_rpc_channel->increment(1); - _light_work_pool.offer([controller, request, response, done]() { + _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { From f0fce6e7cb13ca1506a0558ad4a4c2aade6d43c1 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Mon, 13 Feb 2023 22:42:10 +0800 Subject: [PATCH 11/19] fix --- be/src/service/internal_service.cpp | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index b6710332d39f09..8351010833a539 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -377,7 +377,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c PFetchTableSchemaResult* result, google::protobuf::Closure* done) { DorisMetrics::instance()->fetch_table_schema->increment(1); - _light_work_pool.offer([request, result, done]() { + _heavy_work_pool.offer([request, result, done]() { VLOG_RPC << "fetch table schema"; brpc::ClosureGuard closure_guard(done); TFileScanRange file_scan_range; @@ -493,7 +493,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { DorisMetrics::instance()->get_info->increment(1); - _light_work_pool.offer([this, request, response, done]() { + _heavy_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: @@ -691,17 +691,10 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c PConstantExprResult* response, google::protobuf::Closure* done) { DorisMetrics::instance()->fold_constant_expr->increment(1); - _light_work_pool.offer([this, controller, request, response, done]() { + _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); - brpc::Controller* cntl = static_cast(controller); - Status st = Status::OK(); - if (request->has_request()) { - st = _fold_constant_expr(request->request(), response); - } else { - // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 - st = _fold_constant_expr(cntl->request_attachment().to_string(), response); - } + st = _fold_constant_expr(request->request(), response); if (!st.ok()) { LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; } @@ -1209,7 +1202,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro PMultiGetResponse* response, google::protobuf::Closure* done) { DorisMetrics::instance()->multiget_data->increment(1); - _light_work_pool.offer([request, response, done, this]() { + _heavy_work_pool.offer([request, response, done, this]() { // multi get data by rowid MonotonicStopWatch watch; watch.start(); From a02249191ab190819a983c4ffd27140bb4b4ce8c Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Mon, 13 Feb 2023 23:15:53 +0800 Subject: [PATCH 12/19] fix --- .../admin-manual/maint-monitor/monitor-metrics/metrics.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index 3c7b49d6bd3395..0d542431f3f002 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -298,6 +298,12 @@ curl http://be_host:webserver_port/metrics?type=json |`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 | |`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 | |`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 | +|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 | +|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 | +|`doris_be_heavy_work_pool_queue_size`| | Num | brpc heavy线程池队列最大长度,超过则阻塞提交work| | p0 | +|`doris_be_light_work_pool_queue_size`| | Num | brpc light线程池队列最大长度,超过则阻塞提交work| | p0 | +|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 | +|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 | ### 机器监控 From 5faa01e35fb776c35e845df78587e33c035fdacd Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 14 Feb 2023 10:29:30 +0800 Subject: [PATCH 13/19] fix --- regression-test/suites/demo_p0/thread_action.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/demo_p0/thread_action.groovy b/regression-test/suites/demo_p0/thread_action.groovy index fe1753c4322253..3dccb482f54b5d 100644 --- a/regression-test/suites/demo_p0/thread_action.groovy +++ b/regression-test/suites/demo_p0/thread_action.groovy @@ -51,7 +51,7 @@ suite("thread_action") { assertEquals(result[2][0][0], 3) assertEquals(result[3][0][0], 4) } - assertTrue(elapsedMillis < 600) + assertTrue(elapsedMillis < 1200) // you can use qt action in thread action, and you **MUST** specify different tag, From 9da066d838220cc1c0828cbdbc54bb19653575f6 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 14 Feb 2023 11:33:58 +0800 Subject: [PATCH 14/19] fix --- be/src/common/config.h | 7 ++-- be/src/service/internal_service.cpp | 27 ------------ be/src/util/doris_metrics.cpp | 64 ----------------------------- be/src/util/doris_metrics.h | 33 --------------- 4 files changed, 3 insertions(+), 128 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 82addf0086b05a..bff917659ec64f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -35,10 +35,9 @@ CONF_Int32(be_port, "9060"); // port for brpc CONF_Int32(brpc_port, "8060"); -// the number of bthreads for brpc, the default value is set to 32 -// brpc only for network service send or accept request -// no more process any logic -CONF_Int32(brpc_num_threads, "32"); +// the number of bthreads for brpc, the default value is set to -1, +// which means the number of bthreads is #cpu-cores +CONF_Int32(brpc_num_threads, "-1"); // port to brpc server for single replica load CONF_Int32(single_replica_load_brpc_port, "8070"); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 8351010833a539..15fc8c70080198 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -209,7 +209,6 @@ void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->exec_plan_fragment_prepare->increment(1); _light_work_pool.offer([this, controller, request, response, done]() { exec_plan_fragment(controller, request, response, done); }); @@ -219,7 +218,6 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { - DorisMetrics::instance()->exec_plan_fragment_start->increment(1); _light_work_pool.offer([this, controller, request, result, done]() { auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); auto scope = OpentelemetryScope {span}; @@ -233,7 +231,6 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->tablet_writer_add_block->increment(1); _heavy_work_pool.offer([this, controller, request, response, done]() { // TODO(zxy) delete in 1.2 version google::protobuf::Closure* new_done = new NewHttpClosure(done); @@ -247,7 +244,6 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll void PInternalServiceImpl::tablet_writer_add_block_by_http( google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->tablet_writer_add_block_by_http->increment(1); _heavy_work_pool.offer([this, controller, response, done]() { PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); google::protobuf::Closure* new_done = @@ -292,7 +288,6 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->tablet_writer_cancel->increment(1); _light_work_pool.offer([this, request, done]() { VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); @@ -338,7 +333,6 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - DorisMetrics::instance()->cancel_plan_fragment->increment(1); _light_work_pool.offer([this, controller, request, result, done]() { auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); auto scope = OpentelemetryScope {span}; @@ -364,7 +358,6 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - DorisMetrics::instance()->fetch_data->increment(1); _heavy_work_pool.offer([this, controller, request, result, done]() { brpc::Controller* cntl = static_cast(controller); GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); @@ -376,7 +369,6 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, google::protobuf::Closure* done) { - DorisMetrics::instance()->fetch_table_schema->increment(1); _heavy_work_pool.offer([request, result, done]() { VLOG_RPC << "fetch table schema"; brpc::ClosureGuard closure_guard(done); @@ -480,7 +472,6 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->tablet_fetch_data->increment(1); _heavy_work_pool.offer([this, controller, request, response, done]() { [[maybe_unused]] brpc::Controller* cntl = static_cast(controller); brpc::ClosureGuard guard(done); @@ -492,7 +483,6 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->get_info->increment(1); _heavy_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto @@ -555,7 +545,6 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->update_cache->increment(1); _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); @@ -565,7 +554,6 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - DorisMetrics::instance()->fetch_cache->increment(1); _heavy_work_pool.offer([this, request, result, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); @@ -575,7 +563,6 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controll void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->clear_cache->increment(1); _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); @@ -586,7 +573,6 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - DorisMetrics::instance()->merge_filter->increment(1); _light_work_pool.offer([this, controller, request, response, done]() { brpc::ClosureGuard closure_guard(done); auto attachment = static_cast(controller)->request_attachment(); @@ -603,7 +589,6 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - DorisMetrics::instance()->apply_filter->increment(1); _light_work_pool.offer([this, controller, request, response, done]() { brpc::ClosureGuard closure_guard(done); auto attachment = static_cast(controller)->request_attachment(); @@ -621,7 +606,6 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->send_data->increment(1); _heavy_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; @@ -647,7 +631,6 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->commit->increment(1); _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; @@ -668,7 +651,6 @@ void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->rollback->increment(1); _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; @@ -690,7 +672,6 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->fold_constant_expr->increment(1); _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); Status st = Status::OK(); @@ -718,7 +699,6 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->transmit_block->increment(1); _heavy_work_pool.offer([this, controller, request, response, done]() { // TODO(zxy) delete in 1.2 version google::protobuf::Closure* new_done = new NewHttpClosure(done); @@ -733,7 +713,6 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->transmit_block_by_http->increment(1); _heavy_work_pool.offer([this, controller, response, done]() { PTransmitDataParams* new_request = new PTransmitDataParams(); google::protobuf::Closure* new_done = @@ -783,7 +762,6 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->check_rpc_channel->increment(1); _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); @@ -813,7 +791,6 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->reset_rpc_channel->increment(1); _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); @@ -849,7 +826,6 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controlle const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->hand_shake->increment(1); _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { @@ -862,7 +838,6 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controlle void PInternalServiceImpl::request_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveRequest* request, PTabletWriteSlaveResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->request_slave_tablet_pull_rowset->increment(1); brpc::ClosureGuard closure_guard(done); RowsetMetaPB rowset_meta_pb = request->rowset_meta(); std::string rowset_path = request->rowset_path(); @@ -1072,7 +1047,6 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote void PInternalServiceImpl::response_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->response_slave_tablet_pull_rowset->increment(1); _heavy_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. " @@ -1201,7 +1175,6 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) { - DorisMetrics::instance()->multiget_data->increment(1); _heavy_work_pool.offer([request, response, done, this]() { // multi get data by rowid MonotonicStopWatch watch; diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 662f1b5eca3cf6..5734e6a26ac8d1 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -29,39 +29,6 @@ namespace doris { DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fragment_requests_total, MetricUnit::REQUESTS, "Total fragment requests received."); - -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(transmit_data, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(transmit_data_by_http, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(exec_plan_fragment, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(exec_plan_fragment_prepare, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(exec_plan_fragment_start, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(cancel_plan_fragment, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fetch_data, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fetch_table_schema, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_writer_open, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_writer_add_block, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_writer_add_block_by_http, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_writer_cancel, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(get_info, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(update_cache, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fetch_cache, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(clear_cache, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(merge_filter, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(apply_filter, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(transmit_block, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(transmit_block_by_http, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(send_data, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(commit, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(rollback, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fold_constant_expr, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(check_rpc_channel, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(reset_rpc_channel, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(hand_shake, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(request_slave_tablet_pull_rowset, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(response_slave_tablet_pull_rowset, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(multiget_data, MetricUnit::REQUESTS, ""); -DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(tablet_fetch_data, MetricUnit::REQUESTS, ""); - DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(fragment_request_duration_us, MetricUnit::MICROSECONDS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes, MetricUnit::BYTES); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_rows, MetricUnit::ROWS); @@ -230,37 +197,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { _server_metric_entity = _metric_registry.register_entity("server"); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_requests_total); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_data); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_data_by_http); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, exec_plan_fragment); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, exec_plan_fragment_prepare); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, exec_plan_fragment_start); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cancel_plan_fragment); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_data); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_table_schema); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_open); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_add_block); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_add_block_by_http); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_cancel); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, get_info); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, update_cache); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_cache); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clear_cache); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, merge_filter); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, apply_filter); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_block); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_block_by_http); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, send_data); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, commit); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, rollback); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fold_constant_expr); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, check_rpc_channel); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, reset_rpc_channel); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, hand_shake); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, request_slave_tablet_pull_rowset); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, response_slave_tablet_pull_rowset); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, multiget_data); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_fetch_data); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index e3331984e7a987..4982862035550f 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -45,39 +45,6 @@ namespace doris { class DorisMetrics { public: IntCounter* fragment_requests_total; - - IntCounter* transmit_data; - IntCounter* transmit_data_by_http; - IntCounter* exec_plan_fragment; - IntCounter* exec_plan_fragment_prepare; - IntCounter* exec_plan_fragment_start; - IntCounter* cancel_plan_fragment; - IntCounter* fetch_data; - IntCounter* fetch_table_schema; - IntCounter* tablet_writer_open; - IntCounter* tablet_writer_add_block; - IntCounter* tablet_writer_add_block_by_http; - IntCounter* tablet_writer_cancel; - IntCounter* get_info; - IntCounter* update_cache; - IntCounter* fetch_cache; - IntCounter* clear_cache; - IntCounter* merge_filter; - IntCounter* apply_filter; - IntCounter* transmit_block; - IntCounter* transmit_block_by_http; - IntCounter* send_data; - IntCounter* commit; - IntCounter* rollback; - IntCounter* fold_constant_expr; - IntCounter* check_rpc_channel; - IntCounter* reset_rpc_channel; - IntCounter* hand_shake; - IntCounter* request_slave_tablet_pull_rowset; - IntCounter* response_slave_tablet_pull_rowset; - IntCounter* multiget_data; - IntCounter* tablet_fetch_data; - IntCounter* fragment_request_duration_us; IntCounter* query_scan_bytes; IntCounter* query_scan_rows; From f82eec2da6b2ddda52725beff707d7b9354609bb Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Wed, 15 Feb 2023 15:31:40 +0800 Subject: [PATCH 15/19] fix --- regression-test/suites/demo_p0/thread_action.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/demo_p0/thread_action.groovy b/regression-test/suites/demo_p0/thread_action.groovy index 3dccb482f54b5d..fe1753c4322253 100644 --- a/regression-test/suites/demo_p0/thread_action.groovy +++ b/regression-test/suites/demo_p0/thread_action.groovy @@ -51,7 +51,7 @@ suite("thread_action") { assertEquals(result[2][0][0], 3) assertEquals(result[3][0][0], 4) } - assertTrue(elapsedMillis < 1200) + assertTrue(elapsedMillis < 600) // you can use qt action in thread action, and you **MUST** specify different tag, From 4014c7efd1156eb80d55611c4279ded0ad869044 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Fri, 17 Feb 2023 10:31:25 +0800 Subject: [PATCH 16/19] fix --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 15fc8c70080198..96f980634995b3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -12,7 +12,7 @@ // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the -// specific language governing permissions and limitations +// specific language governing permissions and limitations // under the License. #include "service/internal_service.h" From 6551c5e1f366b380d56f568a20225fa9486c955e Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Fri, 17 Feb 2023 13:08:18 +0800 Subject: [PATCH 17/19] fix --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 96f980634995b3..15fc8c70080198 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -12,7 +12,7 @@ // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the -// specific language governing permissions and limitations +// specific language governing permissions and limitations // under the License. #include "service/internal_service.h" From 6a5e83d034553855731870ff6b77888bb8afc464 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Mon, 20 Feb 2023 16:24:55 +0800 Subject: [PATCH 18/19] fix --- be/src/service/internal_service.cpp | 229 ++++++++++++++++++++++++---- 1 file changed, 199 insertions(+), 30 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 15fc8c70080198..e958c9faaa8e80 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -173,7 +173,7 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - _light_work_pool.offer([this, request, response, done]() { + bool ret = _light_work_pool.offer([this, request, response, done]() { VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); brpc::ClosureGuard closure_guard(done); @@ -185,6 +185,12 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c } st.to_protobuf(response->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* controller, @@ -209,29 +215,41 @@ void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - _light_work_pool.offer([this, controller, request, response, done]() { + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { exec_plan_fragment(controller, request, response, done); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { - _light_work_pool.offer([this, controller, request, result, done]() { + bool ret = _light_work_pool.offer([this, controller, request, result, done]() { auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); auto scope = OpentelemetryScope {span}; brpc::ClosureGuard closure_guard(done); auto st = _exec_env->fragment_mgr()->start_query_execution(request); st.to_protobuf(result->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, controller, request, response, done]() { + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { // TODO(zxy) delete in 1.2 version google::protobuf::Closure* new_done = new NewHttpClosure(done); brpc::Controller* cntl = static_cast(controller); @@ -239,12 +257,18 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll _tablet_writer_add_block(controller, request, response, new_done); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_add_block_by_http( google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, controller, response, done]() { + bool ret = _heavy_work_pool.offer([this, controller, response, done]() { PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); google::protobuf::Closure* new_done = new NewHttpClosure(new_request, done); @@ -257,6 +281,12 @@ void PInternalServiceImpl::tablet_writer_add_block_by_http( st.to_protobuf(response->mutable_status()); } }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* controller, @@ -264,7 +294,7 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { int64_t submit_task_time_ns = MonotonicNanos(); - _heavy_work_pool.offer([request, response, done, submit_task_time_ns, this]() { + bool ret = _heavy_work_pool.offer([request, response, done, submit_task_time_ns, this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; @@ -282,13 +312,19 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - _light_work_pool.offer([this, request, done]() { + bool ret = _light_work_pool.offer([this, request, done]() { VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); brpc::ClosureGuard closure_guard(done); @@ -299,6 +335,10 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* << ", sender_id=" << request->sender_id(); } }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + } } Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, @@ -333,7 +373,7 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - _light_work_pool.offer([this, controller, request, result, done]() { + bool ret = _light_work_pool.offer([this, controller, request, result, done]() { auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); auto scope = OpentelemetryScope {span}; brpc::ClosureGuard closure_guard(done); @@ -353,23 +393,35 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* // TODO: the logic seems useless, cancel only return Status::OK. remove it st.to_protobuf(result->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, controller, request, result, done]() { + bool ret = _heavy_work_pool.offer([this, controller, request, result, done]() { brpc::Controller* cntl = static_cast(controller); GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, google::protobuf::Closure* done) { - _heavy_work_pool.offer([request, result, done]() { + bool ret = _heavy_work_pool.offer([request, result, done]() { VLOG_RPC << "fetch table schema"; brpc::ClosureGuard closure_guard(done); TFileScanRange file_scan_range; @@ -454,6 +506,12 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c } st.to_protobuf(result->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request, @@ -472,18 +530,24 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, controller, request, response, done]() { + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { [[maybe_unused]] brpc::Controller* cntl = static_cast(controller); brpc::ClosureGuard guard(done); Status st = _tablet_fetch_data(request, response); st.to_protobuf(response->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, request, response, done]() { + bool ret = _heavy_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: @@ -540,40 +604,61 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, } Status::OK().to_protobuf(response->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - _light_work_pool.offer([this, request, response, done]() { + bool ret = _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, request, result, done]() { + bool ret = _heavy_work_pool.offer([this, request, result, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - _light_work_pool.offer([this, request, response, done]() { + bool ret = _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller, const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - _light_work_pool.offer([this, controller, request, response, done]() { + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { brpc::ClosureGuard closure_guard(done); auto attachment = static_cast(controller)->request_attachment(); butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); @@ -583,13 +668,19 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr } st.to_protobuf(response->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller, const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - _light_work_pool.offer([this, controller, request, response, done]() { + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { brpc::ClosureGuard closure_guard(done); auto attachment = static_cast(controller)->request_attachment(); butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); @@ -601,12 +692,18 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr } st.to_protobuf(response->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, request, response, done]() { + bool ret = _heavy_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -626,12 +723,18 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller response->mutable_status()->set_status_code(0); } }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - _light_work_pool.offer([this, request, response, done]() { + bool ret = _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -646,12 +749,18 @@ void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, response->mutable_status()->set_status_code(0); } }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - _light_work_pool.offer([this, request, response, done]() { + bool ret = _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -666,13 +775,19 @@ void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, response->mutable_status()->set_status_code(0); } }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* controller, const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - _light_work_pool.offer([this, request, response, done]() { + bool ret = _light_work_pool.offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); Status st = Status::OK(); st = _fold_constant_expr(request->request(), response); @@ -681,6 +796,12 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c } st.to_protobuf(response->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, @@ -699,7 +820,7 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, controller, request, response, done]() { + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { // TODO(zxy) delete in 1.2 version google::protobuf::Closure* new_done = new NewHttpClosure(done); brpc::Controller* cntl = static_cast(controller); @@ -707,13 +828,19 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr _transmit_block(controller, request, response, new_done, Status::OK()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([this, controller, response, done]() { + bool ret = _heavy_work_pool.offer([this, controller, response, done]() { PTransmitDataParams* new_request = new PTransmitDataParams(); google::protobuf::Closure* new_done = new NewHttpClosure(new_request, done); @@ -722,6 +849,12 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle attachment_extract_request_contain_block(new_request, cntl); _transmit_block(controller, new_request, response, new_done, st); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* controller, @@ -762,7 +895,7 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - _light_work_pool.offer([request, response, done]() { + bool ret = _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->data().size() != request->size()) { @@ -785,13 +918,19 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co } } }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* controller, const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - _light_work_pool.offer([request, response, done]() { + bool ret = _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { @@ -820,19 +959,31 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co } } }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - _light_work_pool.offer([request, response, done]() { + bool ret = _light_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { response->set_hello(request->hello()); } response->mutable_status()->set_status_code(0); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::request_slave_tablet_pull_rowset( @@ -847,8 +998,8 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( int64_t brpc_port = request->brpc_port(); std::string token = request->token(); int64_t node_id = request->node_id(); - _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, http_port, - token, rowset_path, this]() { + bool ret = _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, + http_port, token, rowset_path, this]() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash()); if (tablet == nullptr) { @@ -989,6 +1140,12 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), rowset_meta->tablet_id(), node_id, true); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } Status::OK().to_protobuf(response->mutable_status()); } @@ -1047,7 +1204,7 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote void PInternalServiceImpl::response_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([request, response, done]() { + bool ret = _heavy_work_pool.offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. " "slave server=" @@ -1057,6 +1214,12 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); Status::OK().to_protobuf(response->mutable_status()); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } static Status read_by_rowids( @@ -1175,7 +1338,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) { - _heavy_work_pool.offer([request, response, done, this]() { + bool ret = _heavy_work_pool.offer([request, response, done, this]() { // multi get data by rowid MonotonicStopWatch watch; watch.start(); @@ -1185,6 +1348,12 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro st.to_protobuf(response->mutable_status()); LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } } // namespace doris From 92334a01998d2ede51d01a2744ae9abfdd4f6af6 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Mon, 20 Feb 2023 16:25:27 +0800 Subject: [PATCH 19/19] fix --- gensrc/proto/internal_service.proto | 1 + 1 file changed, 1 insertion(+) diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 4e7cb17e0a0b10..1656311ffa6af6 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -275,6 +275,7 @@ enum PCacheStatus { INVALID_KEY_RANGE = 6; DATA_OVERDUE = 7; EMPTY_DATA = 8; + CANCELED = 9; }; enum CacheType {