Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 14 additions & 33 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,45 +407,26 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
}
}

void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller,
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() {
_tablet_writer_add_block(controller, request, response, done);
});
if (!ret) {
offer_failed(response, done, _heavy_work_pool);
return;
}
}

void PInternalServiceImpl::tablet_writer_add_block_by_http(
google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request,
PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([this, controller, response, done]() {
PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest();
google::protobuf::Closure* new_done =
new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(
new_request, cntl);
if (st.ok()) {
_tablet_writer_add_block(controller, new_request, response, new_done);
} else {
st.to_protobuf(response->mutable_status());
}
});
if (!ret) {
offer_failed(response, done, _heavy_work_pool);
return;
PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest();
google::protobuf::Closure* new_done =
new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
cntl);
if (st.ok()) {
tablet_writer_add_block(controller, new_request, response, new_done);
} else {
st.to_protobuf(response->mutable_status());
}
}

void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* controller,
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure* done) {
void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller,
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure* done) {
int64_t submit_task_time_ns = MonotonicNanos();
bool ret = _heavy_work_pool.try_offer([request, response, done, submit_task_time_ns, this]() {
int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns;
Expand Down
5 changes: 0 additions & 5 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,6 @@ class PInternalServiceImpl : public PBackendService {
::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done,
const Status& extract_st);

void _tablet_writer_add_block(google::protobuf::RpcController* controller,
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure* done);

void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port,
int64_t txn_id, int64_t tablet_id, int64_t node_id,
bool is_succeed);
Expand Down