Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,12 @@ CONF_mInt64(streaming_load_json_max_mb, "100");
CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
// the timeout of a rpc to open the tablet writer in remote BE.
// short operation time, can set a short timeout
CONF_mInt32(tablet_writer_open_rpc_timeout_sec, "60");
CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
// You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
CONF_mBool(tablet_writer_ignore_eovercrowded, "false");

// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
CONF_mInt32(olap_table_sink_send_interval_ms, "1");

// Fragment thread pool
Expand Down
27 changes: 18 additions & 9 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ void NodeChannel::open() {
// This ref is for RPC's reference
_open_closure->ref();
_open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000);
if (config::tablet_writer_ignore_eovercrowded) {
_open_closure->cntl.ignore_eovercrowded();
}
_stub->tablet_writer_open(&_open_closure->cntl, &request, &_open_closure->result,
_open_closure);
request.release_id();
Expand Down Expand Up @@ -294,6 +297,9 @@ void NodeChannel::cancel() {

closure->ref();
closure->cntl.set_timeout_ms(_rpc_timeout_ms);
if (config::tablet_writer_ignore_eovercrowded) {
closure->cntl.ignore_eovercrowded();
}
_stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure);
request.release_id();
}
Expand Down Expand Up @@ -327,6 +333,9 @@ int NodeChannel::try_send_and_fetch_status() {

_add_batch_closure->reset();
_add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms);
if (config::tablet_writer_ignore_eovercrowded) {
_add_batch_closure->cntl.ignore_eovercrowded();
}

if (request.eos()) {
for (auto pid : _parent->_partition_ids) {
Expand Down Expand Up @@ -674,13 +683,13 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
}

Status OlapTableSink::close(RuntimeState* state, Status close_status) {
if (_is_closed) {
/// The close method may be called twice.
/// In the open_internal() method of plan_fragment_executor, close is called once.
/// If an error occurs in this call, it will be called again in fragment_mgr.
/// So here we use a flag to prevent repeated close operations.
return _close_status;
}
if (_is_closed) {
/// The close method may be called twice.
/// In the open_internal() method of plan_fragment_executor, close is called once.
/// If an error occurs in this call, it will be called again in fragment_mgr.
/// So here we use a flag to prevent repeated close operations.
return _close_status;
}
Status status = close_status;
if (status.ok()) {
// only if status is ok can we call this _profile->total_time_counter().
Expand Down Expand Up @@ -758,8 +767,8 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
Expr::close(_output_expr_ctxs, state);
_output_batch.reset();

_close_status = status;
_is_closed = true;
_close_status = status;
_is_closed = true;
return status;
}

Expand Down
8 changes: 8 additions & 0 deletions docs/en/administrator-guide/config/be_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,14 @@ When writing is too frequent and the disk time is insufficient, you can configur

### `tablet_writer_open_rpc_timeout_sec`

### `tablet_writer_ignore_eovercrowded`

* Type: bool
* Description: Used to ignore brpc error '[E1011]The server is overcrowded' when writing data.
* Default value: false

When meet '[E1011]The server is overcrowded' error, you can tune the configuration `brpc_socket_max_unwritten_bytes`, but it can't be modified at runtime. Set it to `true` to avoid writing failed temporarily. Notice that, it only effects `write`, other rpc requests will still check if overcrowded.

### `tc_free_memory_rate`

### `tc_max_total_thread_cache_bytes`
Expand Down
8 changes: 8 additions & 0 deletions docs/zh-CN/administrator-guide/config/be_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,14 @@ Stream Load 一般适用于导入几个GB以内的数据,不适合导入过大

### `tablet_writer_open_rpc_timeout_sec`

### `tablet_writer_ignore_eovercrowded`

* 类型:bool
* 描述:写入时可忽略brpc的'[E1011]The server is overcrowded'错误。
* 默认值:false

当遇到'[E1011]The server is overcrowded'的错误时,可以调整配置项`brpc_socket_max_unwritten_bytes`,但这个配置项不能动态调整。所以可通过设置此项为`true`来临时避免写失败。注意,此配置项只影响写流程,其他的rpc请求依旧会检查是否overcrowded。

### `tc_free_memory_rate`

### `tc_max_total_thread_cache_bytes`
Expand Down