From 0eacc0e8116ebfc855c328c8cc627641096a95d1 Mon Sep 17 00:00:00 2001 From: huangwei Date: Mon, 28 Dec 2020 11:04:05 +0800 Subject: [PATCH 1/3] [] support ignoring eovercrowded when adding batch --- be/src/common/config.h | 4 ++++ be/src/exec/tablet_sink.cpp | 3 +++ 2 files changed, 7 insertions(+) diff --git a/be/src/common/config.h b/be/src/common/config.h index 6bef85a48420ca..1fc736f45a278f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -366,12 +366,16 @@ CONF_Int32(streaming_load_rpc_max_alive_time_sec, "1200"); // the timeout of a rpc to open the tablet writer in remote BE. // short operation time, can set a short timeout CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); +// You can ignore brpc error '[E1011]The server is overcrowded' when writing data. +CONF_mBool(tablet_writer_ignore_eovercrowded, "false"); // Deprecated, use query_timeout instead // the timeout of a rpc to process one batch in tablet writer. // you may need to increase this timeout if using larger 'streaming_load_max_mb', // or encounter 'tablet writer write failed' error when loading. // CONF_Int32(tablet_writer_rpc_timeout_sec, "600"); + // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. +// You may need to lower the speed when the sink receiver bes are too busy. CONF_mInt32(olap_table_sink_send_interval_ms, "1"); // Fragment thread pool diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 98f5b35521fe14..3452c9e47b7b04 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -327,6 +327,9 @@ int NodeChannel::try_send_and_fetch_status() { _add_batch_closure->reset(); _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); + if (config::tablet_writer_ignore_eovercrowded) { + _add_batch_closure->cntl.ignore_eovercrowded(); + } if (request.eos()) { for (auto pid : _parent->_partition_ids) { From 37c0861e5bf51c554f388956deda186813fe5359 Mon Sep 17 00:00:00 2001 From: huangwei Date: Tue, 5 Jan 2021 11:08:20 +0800 Subject: [PATCH 2/3] [] fix& add doc --- be/src/exec/tablet_sink.cpp | 24 ++++++++++++------- .../administrator-guide/config/be_config.md | 8 +++++++ .../administrator-guide/config/be_config.md | 8 +++++++ 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 3452c9e47b7b04..47305d92827b05 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -110,6 +110,9 @@ void NodeChannel::open() { // This ref is for RPC's reference _open_closure->ref(); _open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); + if (config::tablet_writer_ignore_eovercrowded) { + _open_closure->cntl.ignore_eovercrowded(); + } _stub->tablet_writer_open(&_open_closure->cntl, &request, &_open_closure->result, _open_closure); request.release_id(); @@ -294,6 +297,9 @@ void NodeChannel::cancel() { closure->ref(); closure->cntl.set_timeout_ms(_rpc_timeout_ms); + if (config::tablet_writer_ignore_eovercrowded) { + closure->cntl.ignore_eovercrowded(); + } _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); request.release_id(); } @@ -677,13 +683,13 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { } Status OlapTableSink::close(RuntimeState* state, Status close_status) { - if (_is_closed) { - /// The close method may be called twice. - /// In the open_internal() method of plan_fragment_executor, close is called once. - /// If an error occurs in this call, it will be called again in fragment_mgr. - /// So here we use a flag to prevent repeated close operations. - return _close_status; - } + if (_is_closed) { + /// The close method may be called twice. + /// In the open_internal() method of plan_fragment_executor, close is called once. + /// If an error occurs in this call, it will be called again in fragment_mgr. + /// So here we use a flag to prevent repeated close operations. + return _close_status; + } Status status = close_status; if (status.ok()) { // only if status is ok can we call this _profile->total_time_counter(). @@ -761,8 +767,8 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { Expr::close(_output_expr_ctxs, state); _output_batch.reset(); - _close_status = status; - _is_closed = true; + _close_status = status; + _is_closed = true; return status; } diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 2a9096f199faac..4b596a1fb42a73 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -804,6 +804,14 @@ When writing is too frequent and the disk time is insufficient, you can configur ### `tablet_writer_open_rpc_timeout_sec` +### `tablet_writer_ignore_eovercrowded` + +* Type: bool +* Description: Used to ignore brpc error '[E1011]The server is overcrowded' when writing data. +* Default value: false + +When meet '[E1011]The server is overcrowded' error, you can tune the configuration `brpc_socket_max_unwritten_bytes`, but it can't be modified at runtime. Set it to `true` to avoid writing failed temporarily. Notice that, it only effects `write`, other rpc requests will still check if overcrowded. + ### `tc_free_memory_rate` ### `tc_max_total_thread_cache_bytes` diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 0bc0d38310e47a..8633269d3dc36a 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -803,6 +803,14 @@ Stream Load 一般适用于导入几个GB以内的数据,不适合导入过大 ### `tablet_writer_open_rpc_timeout_sec` +### `tablet_writer_ignore_eovercrowded` + +* 类型:bool +* 描述:写入时可忽略brpc的'[E1011]The server is overcrowded'错误。 +* 默认值:false + +当遇到'[E1011]The server is overcrowded'的错误时,可以调整配置项`brpc_socket_max_unwritten_bytes`,但这个配置项不能动态调整。所以可通过设置此项为`true`来临时避免写失败。注意,此配置项只影响写流程,其他的rpc请求依旧会检查是否overcrowded。 + ### `tc_free_memory_rate` ### `tc_max_total_thread_cache_bytes` From 1c69ccc376750a4f72bfac41892af5610118d1f2 Mon Sep 17 00:00:00 2001 From: HuangWei Date: Wed, 6 Jan 2021 10:32:47 +0800 Subject: [PATCH 3/3] Update config.h --- be/src/common/config.h | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 85386918d02e26..66e52a3be02a3c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -330,7 +330,6 @@ CONF_mInt64(streaming_load_json_max_mb, "100"); CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); // the timeout of a rpc to open the tablet writer in remote BE. // short operation time, can set a short timeout - CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. CONF_mBool(tablet_writer_ignore_eovercrowded, "false");