diff --git a/be/src/common/config.h b/be/src/common/config.h index 4454c7fed17b61..66e52a3be02a3c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -330,8 +330,12 @@ CONF_mInt64(streaming_load_json_max_mb, "100"); CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); // the timeout of a rpc to open the tablet writer in remote BE. // short operation time, can set a short timeout -CONF_mInt32(tablet_writer_open_rpc_timeout_sec, "60"); +CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); +// You can ignore brpc error '[E1011]The server is overcrowded' when writing data. +CONF_mBool(tablet_writer_ignore_eovercrowded, "false"); + // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. +// You may need to lower the speed when the sink receiver bes are too busy. CONF_mInt32(olap_table_sink_send_interval_ms, "1"); // Fragment thread pool diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 98f5b35521fe14..47305d92827b05 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -110,6 +110,9 @@ void NodeChannel::open() { // This ref is for RPC's reference _open_closure->ref(); _open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); + if (config::tablet_writer_ignore_eovercrowded) { + _open_closure->cntl.ignore_eovercrowded(); + } _stub->tablet_writer_open(&_open_closure->cntl, &request, &_open_closure->result, _open_closure); request.release_id(); @@ -294,6 +297,9 @@ void NodeChannel::cancel() { closure->ref(); closure->cntl.set_timeout_ms(_rpc_timeout_ms); + if (config::tablet_writer_ignore_eovercrowded) { + closure->cntl.ignore_eovercrowded(); + } _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); request.release_id(); } @@ -327,6 +333,9 @@ int NodeChannel::try_send_and_fetch_status() { _add_batch_closure->reset(); _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); + if (config::tablet_writer_ignore_eovercrowded) { + _add_batch_closure->cntl.ignore_eovercrowded(); + } if (request.eos()) { for (auto pid : _parent->_partition_ids) { @@ -674,13 +683,13 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { } Status OlapTableSink::close(RuntimeState* state, Status close_status) { - if (_is_closed) { - /// The close method may be called twice. - /// In the open_internal() method of plan_fragment_executor, close is called once. - /// If an error occurs in this call, it will be called again in fragment_mgr. - /// So here we use a flag to prevent repeated close operations. - return _close_status; - } + if (_is_closed) { + /// The close method may be called twice. + /// In the open_internal() method of plan_fragment_executor, close is called once. + /// If an error occurs in this call, it will be called again in fragment_mgr. + /// So here we use a flag to prevent repeated close operations. + return _close_status; + } Status status = close_status; if (status.ok()) { // only if status is ok can we call this _profile->total_time_counter(). @@ -758,8 +767,8 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { Expr::close(_output_expr_ctxs, state); _output_batch.reset(); - _close_status = status; - _is_closed = true; + _close_status = status; + _is_closed = true; return status; } diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 73e04761236768..3437639938c8a6 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -794,6 +794,14 @@ When writing is too frequent and the disk time is insufficient, you can configur ### `tablet_writer_open_rpc_timeout_sec` +### `tablet_writer_ignore_eovercrowded` + +* Type: bool +* Description: Used to ignore brpc error '[E1011]The server is overcrowded' when writing data. +* Default value: false + +When meet '[E1011]The server is overcrowded' error, you can tune the configuration `brpc_socket_max_unwritten_bytes`, but it can't be modified at runtime. Set it to `true` to avoid writing failed temporarily. Notice that, it only effects `write`, other rpc requests will still check if overcrowded. + ### `tc_free_memory_rate` ### `tc_max_total_thread_cache_bytes` diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 2a0030773997f2..7cc4c913655bbb 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -795,6 +795,14 @@ Stream Load 一般适用于导入几个GB以内的数据,不适合导入过大 ### `tablet_writer_open_rpc_timeout_sec` +### `tablet_writer_ignore_eovercrowded` + +* 类型:bool +* 描述:写入时可忽略brpc的'[E1011]The server is overcrowded'错误。 +* 默认值:false + +当遇到'[E1011]The server is overcrowded'的错误时,可以调整配置项`brpc_socket_max_unwritten_bytes`,但这个配置项不能动态调整。所以可通过设置此项为`true`来临时避免写失败。注意,此配置项只影响写流程,其他的rpc请求依旧会检查是否overcrowded。 + ### `tc_free_memory_rate` ### `tc_max_total_thread_cache_bytes`