From ca55f7412f47a4a031b5e8ac53e040ea93aaac4b Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 18 Dec 2023 12:40:00 +0800 Subject: [PATCH] [improve](load) remove extra layer of heavy work pool in tablet_writer_add_block --- be/src/service/internal_service.cpp | 47 +++++++++-------------------- be/src/service/internal_service.h | 5 --- 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 24152c67089178..8d352ba1d53186 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -407,45 +407,26 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con } } -void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller, - const PTabletWriterAddBlockRequest* request, - PTabletWriterAddBlockResult* response, - google::protobuf::Closure* done) { - bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() { - _tablet_writer_add_block(controller, request, response, done); - }); - if (!ret) { - offer_failed(response, done, _heavy_work_pool); - return; - } -} - void PInternalServiceImpl::tablet_writer_add_block_by_http( google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - bool ret = _heavy_work_pool.try_offer([this, controller, response, done]() { - PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); - google::protobuf::Closure* new_done = - new NewHttpClosure(new_request, done); - brpc::Controller* cntl = static_cast(controller); - Status st = attachment_extract_request_contain_block( - new_request, cntl); - if (st.ok()) { - _tablet_writer_add_block(controller, new_request, response, new_done); - } else { - st.to_protobuf(response->mutable_status()); - } - }); - if (!ret) { - offer_failed(response, done, _heavy_work_pool); - return; + PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); + google::protobuf::Closure* new_done = + new NewHttpClosure(new_request, done); + brpc::Controller* cntl = static_cast(controller); + Status st = attachment_extract_request_contain_block(new_request, + cntl); + if (st.ok()) { + tablet_writer_add_block(controller, new_request, response, new_done); + } else { + st.to_protobuf(response->mutable_status()); } } -void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* controller, - const PTabletWriterAddBlockRequest* request, - PTabletWriterAddBlockResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller, + const PTabletWriterAddBlockRequest* request, + PTabletWriterAddBlockResult* response, + google::protobuf::Closure* done) { int64_t submit_task_time_ns = MonotonicNanos(); bool ret = _heavy_work_pool.try_offer([request, response, done, submit_task_time_ns, this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 15d121f2f1bb0d..261a3d161dcb75 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -240,11 +240,6 @@ class PInternalServiceImpl : public PBackendService { ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done, const Status& extract_st); - void _tablet_writer_add_block(google::protobuf::RpcController* controller, - const PTabletWriterAddBlockRequest* request, - PTabletWriterAddBlockResult* response, - google::protobuf::Closure* done); - void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port, int64_t txn_id, int64_t tablet_id, int64_t node_id, bool is_succeed);