diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 114e7be057e38e..b3ad8869104925 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -398,9 +398,6 @@ set_target_properties(k5crypto PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/li add_library(gssapi_krb5 STATIC IMPORTED) set_target_properties(gssapi_krb5 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libgssapi_krb5.a) -add_library(hdfs3 STATIC IMPORTED) -set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs3.a) - find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin) if (OS_MACOSX) @@ -719,12 +716,32 @@ set(COMMON_THIRDPARTY # put this after lz4 to avoid using lz4 lib in librdkafka librdkafka_cpp librdkafka - hdfs3 xml2 lzma simdjson ) +if (ARCH_AMD64 AND OS_LINUX) + add_library(hadoop_hdfs STATIC IMPORTED) + set_target_properties(hadoop_hdfs PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/hadoop_hdfs/native/libhdfs.a) + + set(COMMON_THIRDPARTY + ${COMMON_THIRDPARTY} + hadoop_hdfs + ) + add_definitions(-DUSE_HADOOP_HDFS) +else() + add_library(hdfs3 STATIC IMPORTED) + set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs3.a) + + # TODO: use arm hadoop hdfs to replace this + set(COMMON_THIRDPARTY + ${COMMON_THIRDPARTY} + hdfs3 + ) + add_definitions(-DUSE_LIBHDFS3) +endif() + if (OS_MACOSX) set(COMMON_THIRDPARTY ${COMMON_THIRDPARTY} diff --git a/be/src/common/config.h b/be/src/common/config.h index 94cad911592603..91e77f0a9c5bf5 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060"); // port for brpc CONF_Int32(brpc_port, "8060"); -// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores +// the number of bthreads for brpc, the default value is set to -1, +// which means the number of bthreads is #cpu-cores CONF_Int32(brpc_num_threads, "-1"); // port to brpc server for single replica load @@ -395,8 +396,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64"); CONF_Int64(load_data_reserve_hours, "4"); // log error log will be removed after this time CONF_mInt64(load_error_log_reserve_hours, "48"); -CONF_Int32(number_tablet_writer_threads, "16"); -CONF_Int32(number_slave_replica_download_threads, "64"); + +// be brpc interface is classified into two categories: light and heavy +// each category has diffrent thread number +// threads to handle heavy api interface, such as transmit_data/transmit_block etc +CONF_Int32(brpc_heavy_work_pool_threads, "192"); +// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start +CONF_Int32(brpc_light_work_pool_threads, "32"); +CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240"); +CONF_Int32(brpc_light_work_pool_max_queue_size, "10240"); // The maximum amount of data that can be processed by a stream load CONF_mInt64(streaming_load_max_mb, "10240"); @@ -898,8 +906,6 @@ CONF_Int32(segcompaction_threshold_segment_num, "10"); // The segment whose row number above the threshold will be compacted during segcompaction CONF_Int32(segcompaction_small_threshold, "1048576"); -CONF_String(jvm_max_heap_size, "1024M"); - // enable java udf and jdbc scannode CONF_Bool(enable_java_support, "true"); diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 4096d2557b1b40..2a36d4a3c20bd8 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -34,6 +34,7 @@ set(IO_FILES local_file_writer.cpp s3_reader.cpp s3_writer.cpp + fs/err_utils.cpp fs/file_system_map.cpp fs/local_file_reader.cpp fs/local_file_system.cpp diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp new file mode 100644 index 00000000000000..d08ea1bee4f6f2 --- /dev/null +++ b/be/src/io/fs/err_utils.cpp @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "io/fs/err_utils.h" + +#include + +#include + +#include "io/fs/hdfs.h" + +namespace doris { +namespace io { + +std::string errno_to_str() { + char buf[1024]; + return fmt::format("({}), {}", errno, strerror_r(errno, buf, 1024)); +} + +std::string errcode_to_str(const std::error_code& ec) { + return fmt::format("({}), {}", ec.value(), ec.message()); +} + +std::string hdfs_error() { + std::stringstream ss; + char buf[1024]; + ss << "(" << errno << "), " << strerror_r(errno, buf, 1024) << ")"; +#ifdef USE_HADOOP_HDFS + char* root_cause = hdfsGetLastExceptionRootCause(); + if (root_cause != nullptr) { + ss << ", reason: " << root_cause; + } +#else + ss << ", reason: " << hdfsGetLastError(); +#endif + return ss.str(); +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/err_utils.h b/be/src/io/fs/err_utils.h new file mode 100644 index 00000000000000..31ca702c32625d --- /dev/null +++ b/be/src/io/fs/err_utils.h @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace doris { +namespace io { + +std::string errno_to_str(); +std::string errcode_to_str(const std::error_code& ec); +std::string hdfs_error(); + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/hdfs.h b/be/src/io/fs/hdfs.h new file mode 100644 index 00000000000000..eb9e1b2c079573 --- /dev/null +++ b/be/src/io/fs/hdfs.h @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#ifdef USE_HADOOP_HDFS +#include +#else +#include +#endif diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp index c82af8e299788a..c971173c91be10 100644 --- a/be/src/io/hdfs_builder.cpp +++ b/be/src/io/hdfs_builder.cpp @@ -18,6 +18,7 @@ #include "io/hdfs_builder.h" #include +#include #include @@ -26,6 +27,7 @@ #include "util/string_util.h" #include "util/uid_util.h" #include "util/url_coding.h" + namespace doris { Status HDFSCommonBuilder::init_hdfs_builder() { @@ -36,6 +38,7 @@ Status HDFSCommonBuilder::init_hdfs_builder() { return Status::InternalError( "failed to init HDFSCommonBuilder, please check be/conf/hdfs-site.xml and be.out"); } + hdfsBuilderSetForceNewInstance(hdfs_builder); return Status::OK(); } @@ -54,7 +57,10 @@ Status HDFSCommonBuilder::run_kinit() { if (!rc) { return Status::InternalError("Kinit failed, errMsg: " + msg); } +#ifdef USE_LIBHDFS3 + hdfsBuilderSetPrincipal(hdfs_builder, hdfs_kerberos_principal.c_str()); hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str()); +#endif return Status::OK(); } @@ -97,7 +103,6 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* build if (hdfsParams.__isset.hdfs_kerberos_principal) { builder->need_kinit = true; builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal; - hdfsBuilderSetPrincipal(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str()); } if (hdfsParams.__isset.hdfs_kerberos_keytab) { builder->need_kinit = true; diff --git a/be/src/io/hdfs_builder.h b/be/src/io/hdfs_builder.h index eb63fab1b57934..26164ce6ede751 100644 --- a/be/src/io/hdfs_builder.h +++ b/be/src/io/hdfs_builder.h @@ -17,10 +17,9 @@ #pragma once -#include - #include "gen_cpp/PlanNodes_types.h" #include "io/file_reader.h" +#include "io/fs/hdfs.h" namespace doris { @@ -38,9 +37,12 @@ class HDFSCommonBuilder { public: HDFSCommonBuilder() = default; ~HDFSCommonBuilder() { +#if defined(USE_LIBHDFS3) || defined(BE_TEST) + // for hadoop hdfs, the hdfs_builder will be freed in hdfsConnect if (hdfs_builder != nullptr) { hdfsFreeBuilder(hdfs_builder); } +#endif } // Must call this to init hdfs_builder first. @@ -51,7 +53,7 @@ class HDFSCommonBuilder { Status run_kinit(); private: - hdfsBuilder* hdfs_builder; + hdfsBuilder* hdfs_builder = nullptr; bool need_kinit {false}; std::string hdfs_kerberos_keytab; std::string hdfs_kerberos_principal; diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp index 7117ec0e7cf5cd..7e9394d70744ee 100644 --- a/be/src/io/hdfs_file_reader.cpp +++ b/be/src/io/hdfs_file_reader.cpp @@ -14,11 +14,13 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + #include "io/hdfs_file_reader.h" #include #include +#include "io/fs/err_utils.h" #include "service/backend_options.h" namespace doris { @@ -100,12 +102,12 @@ Status HdfsFileReader::open() { if (_hdfs_fs == nullptr) { return Status::InternalError( "open file failed. (BE: {}) namenode:{}, path:{}, err: {}", - BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError()); + BackendOptions::get_localhost(), _namenode, _path, io::hdfs_error()); } } else { return Status::InternalError("open file failed. (BE: {}) namenode:{}, path:{}, err: {}", BackendOptions::get_localhost(), _namenode, _path, - hdfsGetLastError()); + io::hdfs_error()); } } VLOG_NOTICE << "open file, namenode:" << _namenode << ", path:" << _path; @@ -174,7 +176,7 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r if (loop_read < 0) { return Status::InternalError( "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", - BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError()); + BackendOptions::get_localhost(), _namenode, _path, io::hdfs_error()); } if (loop_read == 0) { break; @@ -192,7 +194,7 @@ int64_t HdfsFileReader::size() { hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str()); if (file_info == nullptr) { return Status::IOError("failed to get path info, path: {}, error: {}", _path, - hdfsGetLastError()); + io::hdfs_error()); } _file_size = file_info->mSize; hdfsFreeFileInfo(file_info, 1); @@ -205,7 +207,7 @@ Status HdfsFileReader::seek(int64_t position) { int res = hdfsSeek(_hdfs_fs, _hdfs_file, position); if (res != 0) { return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}", - BackendOptions::get_localhost(), position, hdfsGetLastError()); + BackendOptions::get_localhost(), position, io::hdfs_error()); } _current_offset = position; return Status::OK(); @@ -223,7 +225,7 @@ Status HdfsFsCache::_create_fs(THdfsParams& hdfs_params, hdfsFS* fs) { RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder)); hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); if (hdfs_fs == nullptr) { - return Status::InternalError("connect to hdfs failed. error: {}", hdfsGetLastError()); + return Status::InternalError("connect to hdfs failed. error: {}", io::hdfs_error()); } *fs = hdfs_fs; return Status::OK(); diff --git a/be/src/io/hdfs_writer.cpp b/be/src/io/hdfs_writer.cpp index e6814438f10726..69554d82a908d1 100644 --- a/be/src/io/hdfs_writer.cpp +++ b/be/src/io/hdfs_writer.cpp @@ -17,10 +17,14 @@ #include "io/hdfs_writer.h" +#include + #include #include "common/logging.h" +#include "io/fs/err_utils.h" #include "service/backend_options.h" +#include "util/stack_util.h" namespace doris { @@ -56,16 +60,18 @@ Status HDFSWriter::open() { std::filesystem::path hdfs_path(_path); std::string hdfs_dir = hdfs_path.parent_path().string(); + LOG(INFO) << "hdfs write open: " << hdfs_dir << get_stack_trace(); exists = hdfsExists(_hdfs_fs, hdfs_dir.c_str()); if (exists != 0) { - VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir; + LOG(INFO) << "hdfs dir doesn't exist, create it: " << hdfs_dir << ", path: " << _path + << get_stack_trace(); int ret = hdfsCreateDirectory(_hdfs_fs, hdfs_dir.c_str()); if (ret != 0) { std::stringstream ss; ss << "create dir failed. " << "(BE: " << BackendOptions::get_localhost() << ")" << " namenode: " << _namenode << " path: " << hdfs_dir - << ", err: " << hdfsGetLastError(); + << ", err: " << io::hdfs_error(); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } @@ -76,7 +82,7 @@ Status HDFSWriter::open() { std::stringstream ss; ss << "open file failed. " << "(BE: " << BackendOptions::get_localhost() << ")" - << " namenode:" << _namenode << " path:" << _path << ", err: " << hdfsGetLastError(); + << " namenode:" << _namenode << " path:" << _path << ", err: " << io::hdfs_error(); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } @@ -94,7 +100,7 @@ Status HDFSWriter::write(const uint8_t* buf, size_t buf_len, size_t* written_len std::stringstream ss; ss << "write file failed. " << "(BE: " << BackendOptions::get_localhost() << ")" - << "namenode:" << _namenode << " path:" << _path << ", err: " << hdfsGetLastError(); + << "namenode:" << _namenode << " path:" << _path << ", err: " << io::hdfs_error(); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } @@ -121,7 +127,7 @@ Status HDFSWriter::close() { std::stringstream ss; ss << "failed to flush hdfs file. " << "(BE: " << BackendOptions::get_localhost() << ")" - << "namenode:" << _namenode << " path:" << _path << ", err: " << hdfsGetLastError(); + << "namenode:" << _namenode << " path:" << _path << ", err: " << io::hdfs_error(); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } @@ -135,11 +141,12 @@ Status HDFSWriter::close() { Status HDFSWriter::_connect() { HDFSCommonBuilder builder; - RETURN_IF_ERROR(createHDFSBuilder(_properties, &builder)); + THdfsParams hdfsParams = parse_properties(_properties); + RETURN_IF_ERROR(createHDFSBuilder(hdfsParams, &builder)); _hdfs_fs = hdfsBuilderConnect(builder.get()); if (_hdfs_fs == nullptr) { return Status::InternalError("connect to hdfs failed. namenode address:{}, error {}", - _namenode, hdfsGetLastError()); + _namenode, io::hdfs_error()); } return Status::OK(); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 1f658da09cf304..6db5a5566872e9 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -61,7 +61,15 @@ using namespace ErrorCode; const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, MetricUnit::NOUNIT); + +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); bthread_key_t btls_key; @@ -95,16 +103,42 @@ class NewHttpClosure : public ::google::protobuf::Closure { PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), - _tablet_worker_pool(config::number_tablet_writer_threads, 10240, "tablet_writer"), - _slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240, - "replica_download") { - REGISTER_HOOK_METRIC(add_batch_task_queue_size, - [this]() { return _tablet_worker_pool.get_queue_size(); }); + _heavy_work_pool(config::brpc_heavy_work_pool_threads, + config::brpc_heavy_work_pool_max_queue_size, "brpc_heavy"), + _light_work_pool(config::brpc_light_work_pool_threads, + config::brpc_light_work_pool_max_queue_size, "brpc_light") { + REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, + [this]() { return _heavy_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(light_work_pool_queue_size, + [this]() { return _light_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(heavy_work_active_threads, + [this]() { return _heavy_work_pool.get_active_threads(); }); + REGISTER_HOOK_METRIC(light_work_active_threads, + [this]() { return _light_work_pool.get_active_threads(); }); + + REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size, + []() { return config::brpc_heavy_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(light_work_pool_max_queue_size, + []() { return config::brpc_light_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(heavy_work_max_threads, + []() { return config::brpc_heavy_work_pool_threads; }); + REGISTER_HOOK_METRIC(light_work_max_threads, + []() { return config::brpc_light_work_pool_threads; }); + CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); } PInternalServiceImpl::~PInternalServiceImpl() { - DEREGISTER_HOOK_METRIC(add_batch_task_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_active_threads); + DEREGISTER_HOOK_METRIC(light_work_active_threads); + + DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_max_threads); + DEREGISTER_HOOK_METRIC(light_work_max_threads); + CHECK_EQ(0, bthread_key_delete(btls_key)); } @@ -132,7 +166,7 @@ void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController _transmit_data(cntl_base, new_request, response, new_done, st); } -void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -170,22 +204,31 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() - << ", txn_id=" << request->txn_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->open(*request); - if (!st.ok()) { - LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() - << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + VLOG_RPC << "tablet writer open, id=" << request->id() + << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->open(*request); + if (!st.ok()) { + LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() + << ", index_id=" << request->index_id() + << ", txn_id=" << request->txn_id(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment", cntl_base); + auto span = telemetry::start_rpc_server_span("exec_plan_fragment", controller); auto scope = OpentelemetryScope {span}; brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); @@ -199,67 +242,95 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - exec_plan_fragment(cntl_base, request, response, done); + bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { + exec_plan_fragment(controller, request, response, done); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->fragment_mgr()->start_query_execution(request); - st.to_protobuf(result->mutable_status()); + bool ret = _light_work_pool.try_offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->start_query_execution(request); + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure(done); - brpc::Controller* cntl = static_cast(cntl_base); - attachment_transfer_request_block(request, cntl); + bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure(done); + brpc::Controller* cntl = static_cast(controller); + attachment_transfer_request_block(request, cntl); - _tablet_writer_add_block(cntl_base, request, response, new_done); + _tablet_writer_add_block(controller, request, response, new_done); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_add_block_by_http( - google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, + google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); - google::protobuf::Closure* new_done = - new NewHttpClosure(new_request, done); - brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_block(new_request, - cntl); - if (st.ok()) { - _tablet_writer_add_block(cntl_base, new_request, response, new_done); - } else { - st.to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.try_offer([this, controller, response, done]() { + PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); + google::protobuf::Closure* new_done = + new NewHttpClosure(new_request, done); + brpc::Controller* cntl = static_cast(controller); + Status st = attachment_extract_request_contain_block( + new_request, cntl); + if (st.ok()) { + _tablet_writer_add_block(controller, new_request, response, new_done); + } else { + st.to_protobuf(response->mutable_status()); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer add block, id=" << request->id() - << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() - << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); int64_t submit_task_time_ns = MonotonicNanos(); - _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, this]() { + bool ret = _heavy_work_pool.try_offer([request, response, done, submit_task_time_ns, this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { LOG(WARNING) << "tablet writer add block failed, message=" << st @@ -272,6 +343,12 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, @@ -303,13 +380,13 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { VLOG_RPC << "tablet writer add batch, id=" << request->id() - << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() - << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); + << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); // add batch maybe cost a lot of time, and this callback thread will be held. // this will influence query execution, because the pthreads under bthread may be // exhausted, so we put this to a local thread pool to process int64_t submit_task_time_ns = MonotonicNanos(); - _tablet_worker_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() { + bool ret = _heavy_work_pool.offer([cntl_base, request, response, done, submit_task_time_ns, + this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; @@ -332,20 +409,32 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->cancel(*request); - if (!st.ok()) { - LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() - << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); + bool ret = _light_work_pool.try_offer([this, request, done]() { + VLOG_RPC << "tablet writer cancel, id=" << request->id() + << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->cancel(*request); + if (!st.ok()) { + LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() + << ", index_id=" << request->index_id() + << ", sender_id=" << request->sender_id(); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); } } @@ -377,313 +466,408 @@ Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, } } -void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* controller, const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - TUniqueId tid; - tid.__set_hi(request->finst_id().hi()); - tid.__set_lo(request->finst_id().lo()); - - Status st; - if (request->has_cancel_reason()) { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) - << ", reason: " << request->cancel_reason(); - st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); - } else { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); - st = _exec_env->fragment_mgr()->cancel(tid); - } - if (!st.ok()) { - LOG(WARNING) << "cancel plan fragment failed, errmsg=" << st; + bool ret = _light_work_pool.try_offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + TUniqueId tid; + tid.__set_hi(request->finst_id().hi()); + tid.__set_lo(request->finst_id().lo()); + + Status st = Status::OK(); + if (request->has_cancel_reason()) { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) + << ", reason: " << request->cancel_reason(); + _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + } else { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); + _exec_env->fragment_mgr()->cancel(tid); + } + // TODO: the logic seems useless, cancel only return Status::OK. remove it + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(result->mutable_status()); } -void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - brpc::Controller* cntl = static_cast(cntl_base); - GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); - _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + bool ret = _heavy_work_pool.try_offer([this, controller, request, result, done]() { + brpc::Controller* cntl = static_cast(controller); + GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, google::protobuf::Closure* done) { - VLOG_RPC << "fetch table schema"; - brpc::ClosureGuard closure_guard(done); - TFileScanRange file_scan_range; - Status st = Status::OK(); - { - const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); - uint32_t len = request->file_scan_range().size(); - st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + bool ret = _heavy_work_pool.try_offer([request, result, done]() { + VLOG_RPC << "fetch table schema"; + brpc::ClosureGuard closure_guard(done); + TFileScanRange file_scan_range; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); + uint32_t len = request->file_scan_range().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + if (file_scan_range.__isset.ranges == false) { + st = Status::InternalError("can not get TFileRangeDesc."); + st.to_protobuf(result->mutable_status()); + return; + } + if (file_scan_range.__isset.params == false) { + st = Status::InternalError("can not get TFileScanRangeParams."); + st.to_protobuf(result->mutable_status()); + return; + } + const TFileRangeDesc& range = file_scan_range.ranges.at(0); + const TFileScanRangeParams& params = file_scan_range.params; + + std::unique_ptr reader(nullptr); + std::unique_ptr profile(new RuntimeProfile("FetchTableSchema")); + switch (params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_DEFLATE: { + // file_slots is no use + std::vector file_slots; + reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots)); + break; + } + case TFileFormatType::FORMAT_PARQUET: { + reader.reset(new vectorized::ParquetReader(params, range)); + break; + } + case TFileFormatType::FORMAT_ORC: { + std::vector column_names; + reader.reset(new vectorized::OrcReader(params, range, column_names, "")); + break; + } + case TFileFormatType::FORMAT_JSON: { + std::vector file_slots; + reader.reset(new vectorized::NewJsonReader(profile.get(), params, range, file_slots)); + break; + } + default: + st = Status::InternalError("Not supported file format in fetch table schema: {}", + params.format_type); + st.to_protobuf(result->mutable_status()); + return; + } + std::vector col_names; + std::vector col_types; + st = reader->get_parsed_schema(&col_names, &col_types); if (!st.ok()) { LOG(WARNING) << "fetch table schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; } - } - if (file_scan_range.__isset.ranges == false) { - st = Status::InternalError("can not get TFileRangeDesc."); - st.to_protobuf(result->mutable_status()); - return; - } - if (file_scan_range.__isset.params == false) { - st = Status::InternalError("can not get TFileScanRangeParams."); - st.to_protobuf(result->mutable_status()); - return; - } - const TFileRangeDesc& range = file_scan_range.ranges.at(0); - const TFileScanRangeParams& params = file_scan_range.params; - - std::unique_ptr reader(nullptr); - std::unique_ptr profile(new RuntimeProfile("FetchTableSchema")); - switch (params.format_type) { - case TFileFormatType::FORMAT_CSV_PLAIN: - case TFileFormatType::FORMAT_CSV_GZ: - case TFileFormatType::FORMAT_CSV_BZ2: - case TFileFormatType::FORMAT_CSV_LZ4FRAME: - case TFileFormatType::FORMAT_CSV_LZOP: - case TFileFormatType::FORMAT_CSV_DEFLATE: { - // file_slots is no use - std::vector file_slots; - reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots)); - break; - } - case TFileFormatType::FORMAT_PARQUET: { - reader.reset(new vectorized::ParquetReader(params, range)); - break; - } - case TFileFormatType::FORMAT_ORC: { - std::vector column_names; - reader.reset(new vectorized::OrcReader(params, range, column_names, "")); - break; - } - case TFileFormatType::FORMAT_JSON: { - std::vector file_slots; - reader.reset(new vectorized::NewJsonReader(profile.get(), params, range, file_slots)); - break; - } - default: - st = Status::InternalError("Not supported file format in fetch table schema: {}", - params.format_type); - st.to_protobuf(result->mutable_status()); - return; - } - std::vector col_names; - std::vector col_types; - st = reader->get_parsed_schema(&col_names, &col_types); - if (!st.ok()) { - LOG(WARNING) << "fetch table schema failed, errmsg=" << st; + result->set_column_nums(col_names.size()); + for (size_t idx = 0; idx < col_names.size(); ++idx) { + result->add_column_names(col_names[idx]); + } + for (size_t idx = 0; idx < col_types.size(); ++idx) { + PTypeDesc* type_desc = result->add_column_types(); + col_types[idx].to_protobuf(type_desc); + } st.to_protobuf(result->mutable_status()); - return; - } - result->set_column_nums(col_names.size()); - for (size_t idx = 0; idx < col_names.size(); ++idx) { - result->add_column_names(col_names[idx]); - } - for (size_t idx = 0; idx < col_types.size(); ++idx) { - PTypeDesc* type_desc = result->add_column_types(); - col_types[idx].to_protobuf(type_desc); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(result->mutable_status()); } void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - // PProxyRequest is defined in gensrc/proto/internal_service.proto - // Currently it supports 2 kinds of requests: - // 1. get all kafka partition ids for given topic - // 2. get all kafka partition offsets for given topic and timestamp. - if (request->has_kafka_meta_request()) { - const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.partition_id_for_latest_offsets().empty()) { - // get latest offsets for specified partition ids - std::vector partition_offsets; - Status st = _exec_env->routine_load_task_executor() - ->get_kafka_latest_offsets_for_partitions( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + // PProxyRequest is defined in gensrc/proto/internal_service.proto + // Currently it supports 2 kinds of requests: + // 1. get all kafka partition ids for given topic + // 2. get all kafka partition offsets for given topic and timestamp. + if (request->has_kafka_meta_request()) { + const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); + if (!kafka_request.partition_id_for_latest_offsets().empty()) { + // get latest offsets for specified partition ids + std::vector partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_latest_offsets_for_partitions( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else if (!kafka_request.offset_times().empty()) { - // if offset_times() has elements, which means this request is to get offset by timestamp. - std::vector partition_offsets; - Status st = - _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + st.to_protobuf(response->mutable_status()); + return; + } else if (!kafka_request.offset_times().empty()) { + // if offset_times() has elements, which means this request is to get offset by timestamp. + std::vector partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_partition_offsets_for_times( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else { - // get partition ids of topic - std::vector partition_ids; - Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( - request->kafka_meta_request(), &partition_ids); - if (st.ok()) { - PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); - for (int32_t id : partition_ids) { - kafka_result->add_partition_ids(id); + st.to_protobuf(response->mutable_status()); + return; + } else { + // get partition ids of topic + std::vector partition_ids; + Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( + request->kafka_meta_request(), &partition_ids); + if (st.ok()) { + PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); + for (int32_t id : partition_ids) { + kafka_result->add_partition_ids(id); + } } + st.to_protobuf(response->mutable_status()); + return; } - st.to_protobuf(response->mutable_status()); - return; } + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - Status::OK().to_protobuf(response->mutable_status()); } void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->update(request, response); + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->update(request, response); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->fetch(request, result); + bool ret = _heavy_work_pool.try_offer([this, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->fetch(request, result); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->clear(request, response); + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->clear(request, response); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller, const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "merge meet error" << st.to_string(); + bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "merge meet error" << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller, const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - UniqueId unique_id(request->query_id()); - VLOG_NOTICE << "rpc apply_filter recv"; - Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "apply filter meet error: " << st.to_string(); + bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + UniqueId unique_id(request->query_id()); + VLOG_NOTICE << "rpc apply_filter recv"; + Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "apply filter meet error: " << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - for (int i = 0; i < request->data_size(); ++i) { - std::unique_ptr row(new PDataRow()); - row->CopyFrom(request->data(i)); - Status s = pipe->append(std::move(row)); - if (!s.ok()) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs(s.to_string()); - return; + bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + for (int i = 0; i < request->data_size(); ++i) { + std::unique_ptr row(new PDataRow()); + row->CopyFrom(request->data(i)); + Status s = pipe->append(std::move(row)); + if (!s.ok()) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs(s.to_string()); + return; + } } + response->mutable_status()->set_status_code(0); } - response->mutable_status()->set_status_code(0); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->finish(); - response->mutable_status()->set_status_code(0); + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->finish(); + response->mutable_status()->set_status_code(0); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->cancel("rollback"); - response->mutable_status()->set_status_code(0); + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->cancel("rollback"); + response->mutable_status()->set_status_code(0); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* controller, const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - brpc::Controller* cntl = static_cast(cntl_base); - - Status st = Status::OK(); - if (request->has_request()) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = Status::OK(); st = _fold_constant_expr(request->request(), response); - } else { - // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 - st = _fold_constant_expr(cntl->request_attachment().to_string(), response); - } - if (!st.ok()) { - LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; + if (!st.ok()) { + LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, @@ -700,31 +884,48 @@ Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, return FoldConstantExecutor().fold_constant_vexpr(t_request, response); } -void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure(done); - brpc::Controller* cntl = static_cast(cntl_base); - attachment_transfer_request_block(request, cntl); + bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure(done); + brpc::Controller* cntl = static_cast(controller); + attachment_transfer_request_block(request, cntl); - _transmit_block(cntl_base, request, response, new_done, Status::OK()); + _transmit_block(controller, request, response, new_done, Status::OK()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - PTransmitDataParams* new_request = new PTransmitDataParams(); - google::protobuf::Closure* new_done = - new NewHttpClosure(new_request, done); - brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_block(new_request, cntl); - _transmit_block(cntl_base, new_request, response, new_done, st); + bool ret = _heavy_work_pool.try_offer([this, controller, response, done]() { + PTransmitDataParams* new_request = new PTransmitDataParams(); + google::protobuf::Closure* new_done = + new NewHttpClosure(new_request, done); + brpc::Controller* cntl = static_cast(controller); + Status st = + attachment_extract_request_contain_block(new_request, cntl); + _transmit_block(controller, new_request, response, new_done, st); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -762,25 +963,34 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->data().size() != request->size()) { - std::stringstream ss; - ss << "data size not same, expected: " << request->size() - << ", actual: " << request->data().size(); - response->mutable_status()->add_error_msgs(ss.str()); - response->mutable_status()->set_status_code(1); - - } else { - Md5Digest digest; - digest.update(static_cast(request->data().c_str()), request->data().size()); - digest.digest(); - if (!iequal(digest.hex(), request->md5())) { + bool ret = _light_work_pool.try_offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->data().size() != request->size()) { std::stringstream ss; - ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + ss << "data size not same, expected: " << request->size() + << ", actual: " << request->data().size(); response->mutable_status()->add_error_msgs(ss.str()); response->mutable_status()->set_status_code(1); + + } else { + Md5Digest digest; + digest.update(static_cast(request->data().c_str()), + request->data().size()); + digest.digest(); + if (!iequal(digest.hex(), request->md5())) { + std::stringstream ss; + ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + response->mutable_status()->add_error_msgs(ss.str()); + response->mutable_status()->set_status_code(1); + } } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } @@ -788,44 +998,60 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->all()) { - int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); - if (size > 0) { - std::vector endpoints; - ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); - ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); - *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; - } - } else { - for (const std::string& endpoint : request->endpoints()) { - if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { - response->mutable_status()->add_error_msgs(endpoint + ": not found."); - continue; + bool ret = _light_work_pool.try_offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->all()) { + int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); + if (size > 0) { + std::vector endpoints; + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); + ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); + *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; } + } else { + for (const std::string& endpoint : request->endpoints()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { + response->mutable_status()->add_error_msgs(endpoint + ": not found."); + continue; + } - if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { - response->add_channels(endpoint); - } else { - response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { + response->add_channels(endpoint); + } else { + response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + } + } + if (request->endpoints_size() != response->channels_size()) { + response->mutable_status()->set_status_code(1); } } - if (request->endpoints_size() != response->channels_size()) { - response->mutable_status()->set_status_code(1); - } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - if (request->has_hello()) { - response->set_hello(request->hello()); + bool ret = _light_work_pool.try_offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + if (request->has_hello()) { + response->set_hello(request->hello()); + } + response->mutable_status()->set_status_code(0); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - response->mutable_status()->set_status_code(0); } void PInternalServiceImpl::request_slave_tablet_pull_rowset( @@ -840,7 +1066,8 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( int64_t brpc_port = request->brpc_port(); std::string token = request->token(); int64_t node_id = request->node_id(); - _slave_replica_worker_pool.offer([=]() { + bool ret = _heavy_work_pool.try_offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, + http_port, token, rowset_path, this]() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash()); if (tablet == nullptr) { @@ -981,6 +1208,12 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), rowset_meta->tablet_id(), node_id, true); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } Status::OK().to_protobuf(response->mutable_status()); } @@ -1039,14 +1272,22 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote void PInternalServiceImpl::response_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - VLOG_CRITICAL - << "receive the result of slave replica pull rowset from slave replica. slave server=" - << request->node_id() << ", is_succeed=" << request->is_succeed() - << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); - StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( - request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); - Status::OK().to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.try_offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. " + "slave server=" + << request->node_id() << ", is_succeed=" << request->is_succeed() + << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); + StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( + request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 3ea3655974a1db..e5855d98f3b91d 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -194,8 +194,13 @@ class PInternalServiceImpl : public PBackendService { private: ExecEnv* _exec_env; - PriorityThreadPool _tablet_worker_pool; - PriorityThreadPool _slave_replica_worker_pool; + + // every brpc service request should put into thread pool + // the reason see issue #16634 + // define the interface for reading and writing data as heavy interface + // otherwise as light interface + PriorityThreadPool _heavy_work_pool; + PriorityThreadPool _light_work_pool; }; } // namespace doris diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index 29060613c88f6b..197b7097180f36 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -137,6 +137,18 @@ class BlockingPriorityQueue { return true; } + // Return false if queue full or has been shutdown. + bool try_put(const T& val) { + std::unique_lock unique_lock(_lock); + if (_queue.size() < _max_element && !_shutdown) { + _queue.push(val); + unique_lock.unlock(); + _get_cv.notify_one(); + return true; + } + return false; + } + // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. void shutdown() { { diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 10bcd283937f86..786a5625d9a72b 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -213,6 +213,16 @@ class DorisMetrics { IntCounter* upload_rowset_count; IntCounter* upload_fail_count; + UIntGauge* light_work_pool_queue_size; + UIntGauge* heavy_work_pool_queue_size; + UIntGauge* heavy_work_active_threads; + UIntGauge* light_work_active_threads; + + UIntGauge* heavy_work_pool_max_queue_size; + UIntGauge* light_work_pool_max_queue_size; + UIntGauge* heavy_work_max_threads; + UIntGauge* light_work_max_threads; + static DorisMetrics* instance() { static DorisMetrics instance; return &instance; diff --git a/be/src/util/hdfs_storage_backend.cpp b/be/src/util/hdfs_storage_backend.cpp index 6f3baf2d96388d..083f7e3c21c5fa 100644 --- a/be/src/util/hdfs_storage_backend.cpp +++ b/be/src/util/hdfs_storage_backend.cpp @@ -17,6 +17,7 @@ #include "util/hdfs_storage_backend.h" +#include "io/fs/err_utils.h" #include "io/hdfs_file_reader.h" #include "io/hdfs_reader_writer.h" #include "io/hdfs_writer.h" @@ -125,7 +126,7 @@ Status HDFSStorageBackend::list(const std::string& remote_path, bool contain_md5 std::string normal_str = parse_path(remote_path); int exists = hdfsExists(_hdfs_fs, normal_str.c_str()); if (exists != 0) { - LOG(INFO) << "path does not exist: " << normal_str << ", err: " << strerror(errno); + LOG(INFO) << "path does not exist: " << normal_str << ", err: " << io::hdfs_error(); return Status::OK(); } @@ -134,7 +135,7 @@ Status HDFSStorageBackend::list(const std::string& remote_path, bool contain_md5 if (files_info == nullptr) { std::stringstream ss; ss << "failed to list files from remote path: " << normal_str - << ", err: " << strerror(errno); + << ", err: " << io::hdfs_error(); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } diff --git a/be/src/util/hdfs_storage_backend.h b/be/src/util/hdfs_storage_backend.h index acbf18d2d0307f..9f1b27f6acfea7 100644 --- a/be/src/util/hdfs_storage_backend.h +++ b/be/src/util/hdfs_storage_backend.h @@ -17,8 +17,7 @@ #pragma once -#include - +#include "io/fs/hdfs.h" #include "io/hdfs_builder.h" #include "util/storage_backend.h" diff --git a/be/src/util/hdfs_util.cpp b/be/src/util/hdfs_util.cpp index b58fc75e466caa..e72b08caefe251 100644 --- a/be/src/util/hdfs_util.cpp +++ b/be/src/util/hdfs_util.cpp @@ -21,6 +21,7 @@ #include "common/config.h" #include "common/logging.h" +#include "io/fs/err_utils.h" namespace doris { @@ -33,7 +34,7 @@ hdfsFS HDFSHandle::create_hdfs_fs(HDFSCommonBuilder& hdfs_builder) { hdfsFS hdfs_fs = hdfsBuilderConnect(hdfs_builder.get()); if (hdfs_fs == nullptr) { LOG(WARNING) << "connect to hdfs failed." - << ", error: " << hdfsGetLastError(); + << ", error: " << io::hdfs_error(); return nullptr; } return hdfs_fs; diff --git a/be/src/util/hdfs_util.h b/be/src/util/hdfs_util.h index f7bfc14b3ad076..a872cfe89dee47 100644 --- a/be/src/util/hdfs_util.h +++ b/be/src/util/hdfs_util.h @@ -17,13 +17,12 @@ #pragma once -#include - #include #include #include #include "common/status.h" +#include "io/fs/hdfs.h" #include "io/hdfs_builder.h" namespace doris { @@ -40,4 +39,4 @@ class HDFSHandle { HDFSHandle() {} }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp index 64a8e09becea4b..01558cb2a9dffa 100644 --- a/be/src/util/jni-util.cpp +++ b/be/src/util/jni-util.cpp @@ -24,6 +24,8 @@ #include #include #include +#include +#include #include "common/config.h" #include "gutil/once.h" @@ -37,10 +39,10 @@ namespace doris { namespace { JavaVM* g_vm; -GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT; +[[maybe_unused]] std::once_flag g_vm_once; const std::string GetDorisJNIClasspath() { - const auto* classpath = getenv("DORIS_JNI_CLASSPATH_PARAMETER"); + const auto* classpath = getenv("DORIS_CLASSPATH"); if (classpath) { return classpath; } else { @@ -66,84 +68,50 @@ const std::string GetDorisJNIClasspath() { } } -void FindOrCreateJavaVM() { +// Only used on non-x86 platform +[[maybe_unused]] void FindOrCreateJavaVM() { int num_vms; - int rv = LibJVMLoader::JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms); + int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms); if (rv == 0) { - JavaVMOption* options; - auto classpath = GetDorisJNIClasspath(); - // The following 4 opts are default opts, - // they can be override by JAVA_OPTS env var. - std::string heap_size = fmt::format("-Xmx{}", config::jvm_max_heap_size); - std::string log_path = fmt::format("-DlogPath={}/log/udf-jdbc.log", getenv("DORIS_HOME")); - std::string critical_jni = "-XX:-CriticalJNINatives"; - std::string max_fd_limit = "-XX:-MaxFDLimit"; + std::vector options; char* java_opts = getenv("JAVA_OPTS"); - int no_args; if (java_opts == nullptr) { - no_args = 4; // classpath, heapsize, log path, critical + options = { + GetDorisJNIClasspath(), fmt::format("-Xmx{}", "1g"), + fmt::format("-DlogPath={}/log/jni.log", getenv("DORIS_HOME")), + fmt::format("-Dsun.java.command={}", "DorisBE"), "-XX:-CriticalJNINatives", #ifdef __APPLE__ - no_args++; // -XX:-MaxFDLimit -#endif - options = (JavaVMOption*)calloc(no_args, sizeof(JavaVMOption)); - options[0].optionString = const_cast(classpath.c_str()); - options[1].optionString = const_cast(heap_size.c_str()); - options[2].optionString = const_cast(log_path.c_str()); - options[3].optionString = const_cast(critical_jni.c_str()); -#ifdef __APPLE__ - // On macOS, we should disable MaxFDLimit, otherwise the RLIMIT_NOFILE - // will be assigned the minimum of OPEN_MAX (10240) and rlim_cur (See src/hotspot/os/bsd/os_bsd.cpp) - // and it can not pass the check performed by storage engine. - // The newer JDK has fixed this issue. - options[4].optionString = const_cast(max_fd_limit.c_str()); + // On macOS, we should disable MaxFDLimit, otherwise the RLIMIT_NOFILE + // will be assigned the minimum of OPEN_MAX (10240) and rlim_cur (See src/hotspot/os/bsd/os_bsd.cpp) + // and it can not pass the check performed by storage engine. + // The newer JDK has fixed this issue. + "-XX:-MaxFDLimit" #endif + }; } else { - // user specified opts - // 1. find the number of args - java_opts = strdup(java_opts); - char *str, *token, *save_ptr; - char jvm_arg_delims[] = " "; - for (no_args = 1, str = java_opts;; no_args++, str = nullptr) { - token = strtok_r(str, jvm_arg_delims, &save_ptr); - if (token == nullptr) { - break; - } - } - free(java_opts); - // 2. set args - options = (JavaVMOption*)calloc(no_args, sizeof(JavaVMOption)); - options[0].optionString = const_cast(classpath.c_str()); - java_opts = getenv("JAVA_OPTS"); - if (java_opts != NULL) { - java_opts = strdup(java_opts); - for (no_args = 1, str = java_opts;; no_args++, str = nullptr) { - token = strtok_r(str, jvm_arg_delims, &save_ptr); - if (token == nullptr) { - break; - } - options[no_args].optionString = token; - } - } + std::istringstream stream(java_opts); + options = std::vector(std::istream_iterator {stream}, + std::istream_iterator()); + options.push_back(GetDorisJNIClasspath()); + } + std::unique_ptr jvm_options(new JavaVMOption[options.size()]); + for (int i = 0; i < options.size(); ++i) { + jvm_options[i] = {const_cast(options[i].c_str()), nullptr}; } JNIEnv* env; JavaVMInitArgs vm_args; vm_args.version = JNI_VERSION_1_8; - vm_args.options = options; - vm_args.nOptions = no_args; + vm_args.options = jvm_options.get(); + vm_args.nOptions = options.size(); // Set it to JNI_FALSE because JNI_TRUE will let JVM ignore the max size config. vm_args.ignoreUnrecognized = JNI_FALSE; - jint res = LibJVMLoader::JNI_CreateJavaVM(&g_vm, (void**)&env, &vm_args); + jint res = JNI_CreateJavaVM(&g_vm, (void**)&env, &vm_args); if (JNI_OK != res) { DCHECK(false) << "Failed to create JVM, code= " << res; } - - if (java_opts != nullptr) { - free(java_opts); - } - free(options); } else { CHECK_EQ(rv, 0) << "Could not find any created Java VM"; CHECK_EQ(num_vms, 1) << "No VMs returned"; @@ -196,7 +164,8 @@ Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) { Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) { DCHECK(!tls_env_) << "Call GetJNIEnv() fast path"; - GoogleOnceInit(&g_vm_once, &FindOrCreateJavaVM); +#ifdef USE_LIBHDFS3 + std::call_once(g_vm_once, FindOrCreateJavaVM); int rc = g_vm->GetEnv(reinterpret_cast(&tls_env_), JNI_VERSION_1_8); if (rc == JNI_EDETACHED) { rc = g_vm->AttachCurrentThread((void**)&tls_env_, nullptr); @@ -204,6 +173,10 @@ Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) { if (rc != 0 || tls_env_ == nullptr) { return Status::InternalError("Unable to get JVM: {}", rc); } +#else + // the hadoop libhdfs will do all the stuff + tls_env_ = getJNIEnv(); +#endif *env = tls_env_; return Status::OK(); } diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index 0e551f17cf3500..dfef2d3be75a3f 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -23,6 +23,11 @@ #include "gutil/macros.h" #include "util/thrift_util.h" +#ifdef USE_HADOOP_HDFS +// defined in hadoop_hdfs/hdfs.h +extern "C" JNIEnv* getJNIEnv(void); +#endif + namespace doris { #define RETURN_ERROR_IF_EXC(env) \ diff --git a/be/src/util/libjvm_loader.cpp b/be/src/util/libjvm_loader.cpp index 127d28c2de6efb..6175da6081f264 100644 --- a/be/src/util/libjvm_loader.cpp +++ b/be/src/util/libjvm_loader.cpp @@ -25,6 +25,15 @@ #include "common/status.h" +_JNI_IMPORT_OR_EXPORT_ jint JNICALL JNI_GetCreatedJavaVMs(JavaVM** vm_buf, jsize bufLen, + jsize* numVMs) { + return doris::LibJVMLoader::JNI_GetCreatedJavaVMs(vm_buf, bufLen, numVMs); +} + +_JNI_IMPORT_OR_EXPORT_ jint JNICALL JNI_CreateJavaVM(JavaVM** pvm, void** penv, void* args) { + return doris::LibJVMLoader::JNI_CreateJavaVM(pvm, penv, args); +} + namespace { #ifndef __APPLE__ diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 32fc637970f06c..1c1bf8117e462a 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -54,7 +54,7 @@ class PriorityThreadPool { // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) - : _work_queue(queue_size), _shutdown(false), _name(name) { + : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( std::bind(std::mem_fn(&PriorityThreadPool::work_thread), this, i)); @@ -86,6 +86,11 @@ class PriorityThreadPool { return _work_queue.blocking_put(task); } + virtual bool try_offer(WorkFunction func) { + PriorityThreadPool::Task task = {0, func, 0}; + return _work_queue.try_put(task); + } + // Shuts the thread pool down, causing the work queue to cease accepting offered work // and the worker threads to terminate once they have processed their current work item. // Returns once the shutdown flag has been set, does not wait for the threads to @@ -100,6 +105,7 @@ class PriorityThreadPool { virtual void join() { _threads.join_all(); } virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } + virtual uint32_t get_active_threads() const { return _active_threads; } // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. @@ -135,7 +141,9 @@ class PriorityThreadPool { while (!is_shutdown()) { Task task; if (_work_queue.blocking_get(&task)) { + _active_threads++; task.work_function(); + _active_threads--; } if (_work_queue.get_size() == 0) { _empty_cv.notify_all(); @@ -150,6 +158,7 @@ class PriorityThreadPool { // Set to true when threads should stop doing work and terminate. std::atomic _shutdown; std::string _name; + std::atomic _active_threads; }; } // namespace doris diff --git a/bin/start_be.sh b/bin/start_be.sh index 89c40947c04053..231986940ea03a 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -20,7 +20,8 @@ set -eo pipefail curdir="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" -if [[ "$(uname -s)" == 'Darwin' ]] && command -v brew &>/dev/null; then +MACHINE_OS=$(uname -s) +if [[ "${MACHINE_OS}" == 'Darwin' ]] && command -v brew &>/dev/null; then PATH="$(brew --prefix)/opt/gnu-getopt/bin:${PATH}" export PATH fi @@ -70,16 +71,36 @@ if [[ "$(uname -s)" != 'Darwin' ]]; then fi fi -# add libs to CLASSPATH +# add java libs for f in "${DORIS_HOME}/lib"/*.jar; do - if [[ -z "${DORIS_JNI_CLASSPATH_PARAMETER}" ]]; then - export DORIS_JNI_CLASSPATH_PARAMETER="${f}" + if [[ -z "${DORIS_CLASSPATH}" ]]; then + export DORIS_CLASSPATH="${f}" else - export DORIS_JNI_CLASSPATH_PARAMETER="${f}:${DORIS_JNI_CLASSPATH_PARAMETER}" + export DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" fi done -# DORIS_JNI_CLASSPATH_PARAMETER is used to configure additional jar path to jvm. e.g. -Djava.class.path=$DORIS_HOME/lib/java-udf.jar -export DORIS_JNI_CLASSPATH_PARAMETER="-Djava.class.path=${DORIS_JNI_CLASSPATH_PARAMETER}" + +if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then + # add hadoop libs + for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do + DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + done + for f in "${DORIS_HOME}/lib/hadoop_hdfs/common/lib"/*.jar; do + DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + done + for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs"/*.jar; do + DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + done + for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs/lib"/*.jar; do + DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + done +fi + +# the CLASSPATH and LIBHDFS_OPTS is used for hadoop libhdfs +# and conf/ dir so that hadoop libhdfs can read .xml config file in conf/ +export CLASSPATH="${DORIS_HOME}/conf/:${DORIS_CLASSPATH}" +# DORIS_CLASSPATH is for self-managed jni +export DORIS_CLASSPATH="-Djava.class.path=${DORIS_CLASSPATH}" jdk_version() { local java_cmd="${1}" @@ -230,11 +251,28 @@ set_tcmalloc_heap_limit() { # set_tcmalloc_heap_limit || exit 1 -## set hdfs conf +## set hdfs3 conf if [[ -f "${DORIS_HOME}/conf/hdfs-site.xml" ]]; then export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml" fi +if [[ -z ${JAVA_OPTS} ]]; then + # set default JAVA_OPTS + CUR_DATE=$(date +%Y%m%d-%H%M%S) + JAVA_OPTS="-Xmx1024m -DlogPath=${DORIS_HOME}/log/jni.log -Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" +fi + +if [[ "${MACHINE_OS}" == "Darwin" ]]; then + JAVA_OPTS="${JAVA_OPTS} -XX:-MaxFDLimit" +fi + +# set LIBHDFS_OPTS for hadoop libhdfs +export LIBHDFS_OPTS="${JAVA_OPTS}" + +#echo "CLASSPATH: ${CLASSPATH}" +#echo "LD_LIBRARY_PATH: ${LD_LIBRARY_PATH}" +#echo "LIBHDFS_OPTS: ${LIBHDFS_OPTS}" + # see https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16,prof_prefix:jeprof.out" diff --git a/build.sh b/build.sh index 9eaa16d8b64c71..d8296e9bbf9291 100755 --- a/build.sh +++ b/build.sh @@ -539,6 +539,10 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then cp -r -p "${DORIS_HOME}/be/output/bin"/* "${DORIS_OUTPUT}/be/bin"/ cp -r -p "${DORIS_HOME}/be/output/conf"/* "${DORIS_OUTPUT}/be/conf"/ + if [[ -d "${DORIS_THIRDPARTY}/installed/lib/hadoop_hdfs/" ]]; then + cp -r -p "${DORIS_THIRDPARTY}/installed/lib/hadoop_hdfs/" "${DORIS_OUTPUT}/be/lib/" + fi + if [[ "${BUILD_JAVA_UDF}" -eq 0 ]]; then echo -e "\033[33;1mWARNNING: \033[37;1mDisable Java UDF support in be.conf due to the BE was built without Java UDF.\033[0m" cat >>"${DORIS_OUTPUT}/be/conf/be.conf" <