diff --git a/be/src/common/config.h b/be/src/common/config.h index 100664fb978130..bff917659ec64f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060"); // port for brpc CONF_Int32(brpc_port, "8060"); -// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores +// the number of bthreads for brpc, the default value is set to -1, +// which means the number of bthreads is #cpu-cores CONF_Int32(brpc_num_threads, "-1"); // port to brpc server for single replica load @@ -385,8 +386,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64"); CONF_Int64(load_data_reserve_hours, "4"); // log error log will be removed after this time CONF_mInt64(load_error_log_reserve_hours, "48"); -CONF_Int32(number_tablet_writer_threads, "16"); -CONF_Int32(number_slave_replica_download_threads, "64"); + +// be brpc interface is classified into two categories: light and heavy +// each category has diffrent thread number +// threads to handle heavy api interface, such as transmit_data/transmit_block etc +CONF_Int32(brpc_heavy_work_pool_threads, "192"); +// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start +CONF_Int32(brpc_light_work_pool_threads, "32"); +CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240"); +CONF_Int32(brpc_light_work_pool_max_queue_size, "10240"); // The maximum amount of data that can be processed by a stream load CONF_mInt64(streaming_load_max_mb, "10240"); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 68718d395628a0..e958c9faaa8e80 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -70,7 +70,15 @@ using namespace ErrorCode; const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, MetricUnit::NOUNIT); + +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); bthread_key_t btls_key; @@ -104,32 +112,58 @@ class NewHttpClosure : public ::google::protobuf::Closure { PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), - _tablet_worker_pool(config::number_tablet_writer_threads, 10240, "tablet_writer"), - _slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240, - "replica_download") { - REGISTER_HOOK_METRIC(add_batch_task_queue_size, - [this]() { return _tablet_worker_pool.get_queue_size(); }); + _heavy_work_pool(config::brpc_heavy_work_pool_threads, + config::brpc_heavy_work_pool_max_queue_size, "brpc_heavy"), + _light_work_pool(config::brpc_light_work_pool_threads, + config::brpc_light_work_pool_max_queue_size, "brpc_light") { + REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, + [this]() { return _heavy_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(light_work_pool_queue_size, + [this]() { return _light_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(heavy_work_active_threads, + [this]() { return _heavy_work_pool.get_active_threads(); }); + REGISTER_HOOK_METRIC(light_work_active_threads, + [this]() { return _light_work_pool.get_active_threads(); }); + + REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size, + []() { return config::brpc_heavy_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(light_work_pool_max_queue_size, + []() { return config::brpc_light_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(heavy_work_max_threads, + []() { return config::brpc_heavy_work_pool_threads; }); + REGISTER_HOOK_METRIC(light_work_max_threads, + []() { return config::brpc_light_work_pool_threads; }); + CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, AsyncIO::io_ctx_key_deleter)); } PInternalServiceImpl::~PInternalServiceImpl() { - DEREGISTER_HOOK_METRIC(add_batch_task_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_active_threads); + DEREGISTER_HOOK_METRIC(light_work_active_threads); + + DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_max_threads); + DEREGISTER_HOOK_METRIC(light_work_max_threads); + CHECK_EQ(0, bthread_key_delete(btls_key)); CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); } -void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) {} -void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) {} -void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -139,22 +173,31 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() - << ", txn_id=" << request->txn_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->open(*request); - if (!st.ok()) { - LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() - << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); + bool ret = _light_work_pool.offer([this, request, response, done]() { + VLOG_RPC << "tablet writer open, id=" << request->id() + << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->open(*request); + if (!st.ok()) { + LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() + << ", index_id=" << request->index_id() + << ", txn_id=" << request->txn_id(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment", cntl_base); + auto span = telemetry::start_rpc_server_span("exec_plan_fragment", controller); auto scope = OpentelemetryScope {span}; brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); @@ -168,67 +211,95 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - exec_plan_fragment(cntl_base, request, response, done); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + exec_plan_fragment(controller, request, response, done); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->fragment_mgr()->start_query_execution(request); - st.to_protobuf(result->mutable_status()); + bool ret = _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->start_query_execution(request); + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure(done); - brpc::Controller* cntl = static_cast(cntl_base); - attachment_transfer_request_block(request, cntl); + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure(done); + brpc::Controller* cntl = static_cast(controller); + attachment_transfer_request_block(request, cntl); - _tablet_writer_add_block(cntl_base, request, response, new_done); + _tablet_writer_add_block(controller, request, response, new_done); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_add_block_by_http( - google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, + google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); - google::protobuf::Closure* new_done = - new NewHttpClosure(new_request, done); - brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_block(new_request, - cntl); - if (st.ok()) { - _tablet_writer_add_block(cntl_base, new_request, response, new_done); - } else { - st.to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.offer([this, controller, response, done]() { + PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); + google::protobuf::Closure* new_done = + new NewHttpClosure(new_request, done); + brpc::Controller* cntl = static_cast(controller); + Status st = attachment_extract_request_contain_block( + new_request, cntl); + if (st.ok()) { + _tablet_writer_add_block(controller, new_request, response, new_done); + } else { + st.to_protobuf(response->mutable_status()); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer add block, id=" << request->id() - << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() - << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); int64_t submit_task_time_ns = MonotonicNanos(); - _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, this]() { + bool ret = _heavy_work_pool.offer([request, response, done, submit_task_time_ns, this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { LOG(WARNING) << "tablet writer add block failed, message=" << st @@ -241,20 +312,32 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->cancel(*request); - if (!st.ok()) { - LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() - << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); + bool ret = _light_work_pool.offer([this, request, done]() { + VLOG_RPC << "tablet writer cancel, id=" << request->id() + << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->cancel(*request); + if (!st.ok()) { + LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() + << ", index_id=" << request->index_id() + << ", sender_id=" << request->sender_id(); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); } } @@ -286,125 +369,149 @@ Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, } } -void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* controller, const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - TUniqueId tid; - tid.__set_hi(request->finst_id().hi()); - tid.__set_lo(request->finst_id().lo()); - - Status st = Status::OK(); - if (request->has_cancel_reason()) { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) - << ", reason: " << request->cancel_reason(); - _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); - } else { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); - _exec_env->fragment_mgr()->cancel(tid); + bool ret = _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + TUniqueId tid; + tid.__set_hi(request->finst_id().hi()); + tid.__set_lo(request->finst_id().lo()); + + Status st = Status::OK(); + if (request->has_cancel_reason()) { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) + << ", reason: " << request->cancel_reason(); + _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + } else { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); + _exec_env->fragment_mgr()->cancel(tid); + } + // TODO: the logic seems useless, cancel only return Status::OK. remove it + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - - // TODO: the logic seems useless, cancel only return Status::OK. remove it - st.to_protobuf(result->mutable_status()); } -void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - brpc::Controller* cntl = static_cast(cntl_base); - GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); - _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + bool ret = _heavy_work_pool.offer([this, controller, request, result, done]() { + brpc::Controller* cntl = static_cast(controller); + GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, google::protobuf::Closure* done) { - VLOG_RPC << "fetch table schema"; - brpc::ClosureGuard closure_guard(done); - TFileScanRange file_scan_range; - Status st = Status::OK(); - { - const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); - uint32_t len = request->file_scan_range().size(); - st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + bool ret = _heavy_work_pool.offer([request, result, done]() { + VLOG_RPC << "fetch table schema"; + brpc::ClosureGuard closure_guard(done); + TFileScanRange file_scan_range; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); + uint32_t len = request->file_scan_range().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + if (file_scan_range.__isset.ranges == false) { + st = Status::InternalError("can not get TFileRangeDesc."); + st.to_protobuf(result->mutable_status()); + return; + } + if (file_scan_range.__isset.params == false) { + st = Status::InternalError("can not get TFileScanRangeParams."); + st.to_protobuf(result->mutable_status()); + return; + } + const TFileRangeDesc& range = file_scan_range.ranges.at(0); + const TFileScanRangeParams& params = file_scan_range.params; + + std::unique_ptr reader(nullptr); + std::unique_ptr profile(new RuntimeProfile("FetchTableSchema")); + IOContext io_ctx; + FileCacheStatistics file_cache_statis; + io_ctx.file_cache_stats = &file_cache_statis; + switch (params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_DEFLATE: { + // file_slots is no use + std::vector file_slots; + reader.reset( + new vectorized::CsvReader(profile.get(), params, range, file_slots, &io_ctx)); + break; + } + case TFileFormatType::FORMAT_PARQUET: { + reader.reset(new vectorized::ParquetReader(params, range, &io_ctx)); + break; + } + case TFileFormatType::FORMAT_ORC: { + std::vector column_names; + reader.reset(new vectorized::OrcReader(params, range, column_names, "", &io_ctx)); + break; + } + case TFileFormatType::FORMAT_JSON: { + std::vector file_slots; + reader.reset(new vectorized::NewJsonReader(profile.get(), params, range, file_slots, + &io_ctx)); + break; + } + default: + st = Status::InternalError("Not supported file format in fetch table schema: {}", + params.format_type); + st.to_protobuf(result->mutable_status()); + return; + } + std::vector col_names; + std::vector col_types; + st = reader->get_parsed_schema(&col_names, &col_types); if (!st.ok()) { LOG(WARNING) << "fetch table schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; } - } - if (file_scan_range.__isset.ranges == false) { - st = Status::InternalError("can not get TFileRangeDesc."); - st.to_protobuf(result->mutable_status()); - return; - } - if (file_scan_range.__isset.params == false) { - st = Status::InternalError("can not get TFileScanRangeParams."); - st.to_protobuf(result->mutable_status()); - return; - } - const TFileRangeDesc& range = file_scan_range.ranges.at(0); - const TFileScanRangeParams& params = file_scan_range.params; - - std::unique_ptr reader(nullptr); - std::unique_ptr profile(new RuntimeProfile("FetchTableSchema")); - IOContext io_ctx; - FileCacheStatistics file_cache_statis; - io_ctx.file_cache_stats = &file_cache_statis; - switch (params.format_type) { - case TFileFormatType::FORMAT_CSV_PLAIN: - case TFileFormatType::FORMAT_CSV_GZ: - case TFileFormatType::FORMAT_CSV_BZ2: - case TFileFormatType::FORMAT_CSV_LZ4FRAME: - case TFileFormatType::FORMAT_CSV_LZOP: - case TFileFormatType::FORMAT_CSV_DEFLATE: { - // file_slots is no use - std::vector file_slots; - reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots, &io_ctx)); - break; - } - case TFileFormatType::FORMAT_PARQUET: { - reader.reset(new vectorized::ParquetReader(params, range, &io_ctx)); - break; - } - case TFileFormatType::FORMAT_ORC: { - std::vector column_names; - reader.reset(new vectorized::OrcReader(params, range, column_names, "", &io_ctx)); - break; - } - case TFileFormatType::FORMAT_JSON: { - std::vector file_slots; - reader.reset( - new vectorized::NewJsonReader(profile.get(), params, range, file_slots, &io_ctx)); - break; - } - default: - st = Status::InternalError("Not supported file format in fetch table schema: {}", - params.format_type); - st.to_protobuf(result->mutable_status()); - return; - } - std::vector col_names; - std::vector col_types; - st = reader->get_parsed_schema(&col_names, &col_types); - if (!st.ok()) { - LOG(WARNING) << "fetch table schema failed, errmsg=" << st; + result->set_column_nums(col_names.size()); + for (size_t idx = 0; idx < col_names.size(); ++idx) { + result->add_column_names(col_names[idx]); + } + for (size_t idx = 0; idx < col_types.size(); ++idx) { + PTypeDesc* type_desc = result->add_column_types(); + col_types[idx].to_protobuf(type_desc); + } st.to_protobuf(result->mutable_status()); - return; - } - result->set_column_nums(col_names.size()); - for (size_t idx = 0; idx < col_names.size(); ++idx) { - result->add_column_names(col_names[idx]); - } - for (size_t idx = 0; idx < col_types.size(); ++idx) { - PTypeDesc* type_desc = result->add_column_types(); - col_types[idx].to_protobuf(type_desc); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(result->mutable_status()); } Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request, @@ -423,200 +530,278 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response, google::protobuf::Closure* done) { - [[maybe_unused]] brpc::Controller* cntl = static_cast(controller); - brpc::ClosureGuard guard(done); - Status st = _tablet_fetch_data(request, response); - st.to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { + [[maybe_unused]] brpc::Controller* cntl = static_cast(controller); + brpc::ClosureGuard guard(done); + Status st = _tablet_fetch_data(request, response); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - // PProxyRequest is defined in gensrc/proto/internal_service.proto - // Currently it supports 2 kinds of requests: - // 1. get all kafka partition ids for given topic - // 2. get all kafka partition offsets for given topic and timestamp. - if (request->has_kafka_meta_request()) { - const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.partition_id_for_latest_offsets().empty()) { - // get latest offsets for specified partition ids - std::vector partition_offsets; - Status st = _exec_env->routine_load_task_executor() - ->get_kafka_latest_offsets_for_partitions( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + bool ret = _heavy_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + // PProxyRequest is defined in gensrc/proto/internal_service.proto + // Currently it supports 2 kinds of requests: + // 1. get all kafka partition ids for given topic + // 2. get all kafka partition offsets for given topic and timestamp. + if (request->has_kafka_meta_request()) { + const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); + if (!kafka_request.partition_id_for_latest_offsets().empty()) { + // get latest offsets for specified partition ids + std::vector partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_latest_offsets_for_partitions( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else if (!kafka_request.offset_times().empty()) { - // if offset_times() has elements, which means this request is to get offset by timestamp. - std::vector partition_offsets; - Status st = - _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + st.to_protobuf(response->mutable_status()); + return; + } else if (!kafka_request.offset_times().empty()) { + // if offset_times() has elements, which means this request is to get offset by timestamp. + std::vector partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_partition_offsets_for_times( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else { - // get partition ids of topic - std::vector partition_ids; - Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( - request->kafka_meta_request(), &partition_ids); - if (st.ok()) { - PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); - for (int32_t id : partition_ids) { - kafka_result->add_partition_ids(id); + st.to_protobuf(response->mutable_status()); + return; + } else { + // get partition ids of topic + std::vector partition_ids; + Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( + request->kafka_meta_request(), &partition_ids); + if (st.ok()) { + PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); + for (int32_t id : partition_ids) { + kafka_result->add_partition_ids(id); + } } + st.to_protobuf(response->mutable_status()); + return; } - st.to_protobuf(response->mutable_status()); - return; } + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - Status::OK().to_protobuf(response->mutable_status()); } void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->update(request, response); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->update(request, response); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->fetch(request, result); + bool ret = _heavy_work_pool.offer([this, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->fetch(request, result); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->clear(request, response); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->clear(request, response); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller, const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "merge meet error" << st.to_string(); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "merge meet error" << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller, const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - UniqueId unique_id(request->query_id()); - VLOG_NOTICE << "rpc apply_filter recv"; - Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "apply filter meet error: " << st.to_string(); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + UniqueId unique_id(request->query_id()); + VLOG_NOTICE << "rpc apply_filter recv"; + Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "apply filter meet error: " << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - for (int i = 0; i < request->data_size(); ++i) { - PDataRow* row = new PDataRow(); - row->CopyFrom(request->data(i)); - pipe->append_and_flush(reinterpret_cast(&row), sizeof(row), - sizeof(row) + row->ByteSizeLong()); + bool ret = _heavy_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + for (int i = 0; i < request->data_size(); ++i) { + PDataRow* row = new PDataRow(); + row->CopyFrom(request->data(i)); + pipe->append_and_flush(reinterpret_cast(&row), sizeof(row), + sizeof(row) + row->ByteSizeLong()); + } + response->mutable_status()->set_status_code(0); } - response->mutable_status()->set_status_code(0); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->finish(); - response->mutable_status()->set_status_code(0); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->finish(); + response->mutable_status()->set_status_code(0); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->cancel("rollback"); - response->mutable_status()->set_status_code(0); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->cancel("rollback"); + response->mutable_status()->set_status_code(0); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* controller, const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - brpc::Controller* cntl = static_cast(cntl_base); - - Status st = Status::OK(); - if (request->has_request()) { + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = Status::OK(); st = _fold_constant_expr(request->request(), response); - } else { - // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 - st = _fold_constant_expr(cntl->request_attachment().to_string(), response); - } - if (!st.ok()) { - LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; + if (!st.ok()) { + LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, @@ -631,31 +816,48 @@ Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, return FoldConstantExecutor().fold_constant_vexpr(t_request, response); } -void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure(done); - brpc::Controller* cntl = static_cast(cntl_base); - attachment_transfer_request_block(request, cntl); + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure(done); + brpc::Controller* cntl = static_cast(controller); + attachment_transfer_request_block(request, cntl); - _transmit_block(cntl_base, request, response, new_done, Status::OK()); + _transmit_block(controller, request, response, new_done, Status::OK()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - PTransmitDataParams* new_request = new PTransmitDataParams(); - google::protobuf::Closure* new_done = - new NewHttpClosure(new_request, done); - brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_block(new_request, cntl); - _transmit_block(cntl_base, new_request, response, new_done, st); + bool ret = _heavy_work_pool.offer([this, controller, response, done]() { + PTransmitDataParams* new_request = new PTransmitDataParams(); + google::protobuf::Closure* new_done = + new NewHttpClosure(new_request, done); + brpc::Controller* cntl = static_cast(controller); + Status st = + attachment_extract_request_contain_block(new_request, cntl); + _transmit_block(controller, new_request, response, new_done, st); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -693,25 +895,34 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->data().size() != request->size()) { - std::stringstream ss; - ss << "data size not same, expected: " << request->size() - << ", actual: " << request->data().size(); - response->mutable_status()->add_error_msgs(ss.str()); - response->mutable_status()->set_status_code(1); - - } else { - Md5Digest digest; - digest.update(static_cast(request->data().c_str()), request->data().size()); - digest.digest(); - if (!iequal(digest.hex(), request->md5())) { + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->data().size() != request->size()) { std::stringstream ss; - ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + ss << "data size not same, expected: " << request->size() + << ", actual: " << request->data().size(); response->mutable_status()->add_error_msgs(ss.str()); response->mutable_status()->set_status_code(1); + + } else { + Md5Digest digest; + digest.update(static_cast(request->data().c_str()), + request->data().size()); + digest.digest(); + if (!iequal(digest.hex(), request->md5())) { + std::stringstream ss; + ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + response->mutable_status()->add_error_msgs(ss.str()); + response->mutable_status()->set_status_code(1); + } } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } @@ -719,44 +930,60 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->all()) { - int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); - if (size > 0) { - std::vector endpoints; - ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); - ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); - *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; - } - } else { - for (const std::string& endpoint : request->endpoints()) { - if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { - response->mutable_status()->add_error_msgs(endpoint + ": not found."); - continue; + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->all()) { + int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); + if (size > 0) { + std::vector endpoints; + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); + ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); + *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; } + } else { + for (const std::string& endpoint : request->endpoints()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { + response->mutable_status()->add_error_msgs(endpoint + ": not found."); + continue; + } - if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { - response->add_channels(endpoint); - } else { - response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { + response->add_channels(endpoint); + } else { + response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + } + } + if (request->endpoints_size() != response->channels_size()) { + response->mutable_status()->set_status_code(1); } } - if (request->endpoints_size() != response->channels_size()) { - response->mutable_status()->set_status_code(1); - } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - if (request->has_hello()) { - response->set_hello(request->hello()); + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + if (request->has_hello()) { + response->set_hello(request->hello()); + } + response->mutable_status()->set_status_code(0); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - response->mutable_status()->set_status_code(0); } void PInternalServiceImpl::request_slave_tablet_pull_rowset( @@ -771,8 +998,8 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( int64_t brpc_port = request->brpc_port(); std::string token = request->token(); int64_t node_id = request->node_id(); - _slave_replica_worker_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, - http_port, token, rowset_path, this]() { + bool ret = _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, + http_port, token, rowset_path, this]() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash()); if (tablet == nullptr) { @@ -913,6 +1140,12 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), rowset_meta->tablet_id(), node_id, true); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } Status::OK().to_protobuf(response->mutable_status()); } @@ -971,14 +1204,22 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote void PInternalServiceImpl::response_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - VLOG_CRITICAL - << "receive the result of slave replica pull rowset from slave replica. slave server=" - << request->node_id() << ", is_succeed=" << request->is_succeed() - << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); - StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( - request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); - Status::OK().to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. " + "slave server=" + << request->node_id() << ", is_succeed=" << request->is_succeed() + << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); + StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( + request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } static Status read_by_rowids( @@ -1097,9 +1338,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) { - // Submit task to seperate ThreadPool for avoiding block bthread working pthread - ThreadPool* task_pool = StorageEngine::instance()->get_bg_multiget_threadpool(); - Status submit_st = task_pool->submit_func([request, response, done, this]() { + bool ret = _heavy_work_pool.offer([request, response, done, this]() { // multi get data by rowid MonotonicStopWatch watch; watch.start(); @@ -1109,9 +1348,11 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro st.to_protobuf(response->mutable_status()); LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; }); - if (!submit_st.ok()) { - submit_st.to_protobuf(response->mutable_status()); - done->Run(); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 103293a7454b81..4c500a245f4edc 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -190,8 +190,13 @@ class PInternalServiceImpl : public PBackendService { private: ExecEnv* _exec_env; - PriorityThreadPool _tablet_worker_pool; - PriorityThreadPool _slave_replica_worker_pool; + + // every brpc service request should put into thread pool + // the reason see issue #16634 + // define the interface for reading and writing data as heavy interface + // otherwise as light interface + PriorityThreadPool _heavy_work_pool; + PriorityThreadPool _light_work_pool; }; } // namespace doris diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 87afed763b0f69..4982862035550f 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -222,6 +222,16 @@ class DorisMetrics { IntCounter* upload_rowset_count; IntCounter* upload_fail_count; + UIntGauge* light_work_pool_queue_size; + UIntGauge* heavy_work_pool_queue_size; + UIntGauge* heavy_work_active_threads; + UIntGauge* light_work_active_threads; + + UIntGauge* heavy_work_pool_max_queue_size; + UIntGauge* light_work_pool_max_queue_size; + UIntGauge* heavy_work_max_threads; + UIntGauge* light_work_max_threads; + static DorisMetrics* instance() { static DorisMetrics instance; return &instance; diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 3bbc53788cb37f..7e2b8b5f77d993 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -55,7 +55,7 @@ class PriorityThreadPool { // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) - : _work_queue(queue_size), _shutdown(false), _name(name) { + : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( std::bind(std::mem_fn(&PriorityThreadPool::work_thread), this, i)); @@ -101,6 +101,7 @@ class PriorityThreadPool { virtual void join() { _threads.join_all(); } virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } + virtual uint32_t get_active_threads() const { return _active_threads; } // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. @@ -136,7 +137,9 @@ class PriorityThreadPool { while (!is_shutdown()) { Task task; if (_work_queue.blocking_get(&task)) { + _active_threads++; task.work_function(); + _active_threads--; } if (_work_queue.get_size() == 0) { _empty_cv.notify_all(); @@ -151,6 +154,7 @@ class PriorityThreadPool { // Set to true when threads should stop doing work and terminate. std::atomic _shutdown; std::string _name; + std::atomic _active_threads; }; } // namespace doris diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index 3c7b49d6bd3395..0d542431f3f002 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -298,6 +298,12 @@ curl http://be_host:webserver_port/metrics?type=json |`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 | |`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 | |`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 | +|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 | +|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 | +|`doris_be_heavy_work_pool_queue_size`| | Num | brpc heavy线程池队列最大长度,超过则阻塞提交work| | p0 | +|`doris_be_light_work_pool_queue_size`| | Num | brpc light线程池队列最大长度,超过则阻塞提交work| | p0 | +|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 | +|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 | ### 机器监控 diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 4e7cb17e0a0b10..1656311ffa6af6 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -275,6 +275,7 @@ enum PCacheStatus { INVALID_KEY_RANGE = 6; DATA_OVERDUE = 7; EMPTY_DATA = 8; + CANCELED = 9; }; enum CacheType {