From 59fc99cec8aaae04cb7c931d42387460ef283459 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Thu, 3 Jun 2021 20:13:51 +0800 Subject: [PATCH 01/11] Support send batch parallelism for olap table sink --- be/src/common/config.h | 6 + be/src/exec/tablet_sink.cpp | 130 +++++++++++------- be/src/exec/tablet_sink.h | 18 ++- be/src/http/action/stream_load.cpp | 14 +- be/src/http/http_common.h | 4 +- be/src/olap/memtable_flush_executor.cpp | 11 +- be/src/olap/memtable_flush_executor.h | 11 +- be/src/service/internal_service.cpp | 7 +- .../administrator-guide/config/be_config.md | 6 + docs/en/administrator-guide/variables.md | 4 + .../Data Manipulation/BROKER LOAD.md | 2 + .../Data Manipulation/ROUTINE LOAD.md | 3 + .../Data Manipulation/STREAM LOAD.md | 2 + .../administrator-guide/config/be_config.md | 6 + docs/zh-CN/administrator-guide/variables.md | 5 +- .../Data Manipulation/BROKER LOAD.md | 57 ++++---- .../Data Manipulation/ROUTINE LOAD.md | 6 +- .../Data Manipulation/STREAM LOAD.md | 2 + .../doris/analysis/CreateRoutineLoadStmt.java | 13 ++ .../org/apache/doris/analysis/InsertStmt.java | 3 +- .../org/apache/doris/analysis/LoadStmt.java | 21 ++- .../doris/load/loadv2/BrokerLoadJob.java | 2 +- .../org/apache/doris/load/loadv2/LoadJob.java | 5 + .../doris/load/loadv2/LoadLoadingTask.java | 6 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 6 +- .../load/routineload/RoutineLoadJob.java | 13 ++ .../doris/load/update/UpdatePlanner.java | 3 +- .../apache/doris/planner/OlapTableSink.java | 3 +- .../doris/planner/StreamLoadPlanner.java | 2 +- .../org/apache/doris/qe/SessionVariable.java | 16 ++- .../org/apache/doris/task/LoadTaskInfo.java | 1 + .../org/apache/doris/task/StreamLoadTask.java | 9 ++ .../doris/load/loadv2/BrokerLoadJobTest.java | 4 +- .../doris/planner/OlapTableSinkTest.java | 8 +- gensrc/proto/internal_service.proto | 1 + gensrc/thrift/DataSinks.thrift | 1 + gensrc/thrift/FrontendService.thrift | 1 + 37 files changed, 293 insertions(+), 119 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 96a81280a3bad4..aa818211551171 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -621,6 +621,12 @@ CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); // else we will call sync method CONF_mBool(runtime_filter_use_async_rpc, "true"); +// max send batch parallelism for OlapTableSink +// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism, +// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism +CONF_mInt32(max_send_batch_parallelism, "1"); +CONF_Validator(max_send_batch_parallelism, [](const int config) -> bool { return config >= 1; }); + } // namespace config } // namespace doris diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index a9488ec7de5227..44883ed27a0d31 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -32,6 +32,7 @@ #include "util/debug/sanitizer_scopes.h" #include "util/monotime.h" #include "util/time.h" +#include "util/threadpool.h" #include "util/uid_util.h" namespace doris { @@ -194,11 +195,10 @@ Status NodeChannel::open_wait() { if (result.has_execution_time_us()) { _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); - _add_batch_counter.add_batch_wait_lock_time_us += result.wait_lock_time_us(); + _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); _add_batch_counter.add_batch_num++; } }); - return status; } @@ -342,68 +342,75 @@ void NodeChannel::cancel() { request.release_id(); } -int NodeChannel::try_send_and_fetch_status() { +int NodeChannel::try_send_and_fetch_status(std::unique_ptr& thread_pool) { auto st = none_of({_cancelled, _send_finished}); if (!st.ok()) { return 0; } if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { - SCOPED_ATOMIC_TIMER(&_actual_consume_ns); - AddBatchReq send_batch; - { - debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; - std::lock_guard l(_pending_batches_lock); - DCHECK(!_pending_batches.empty()); - send_batch = std::move(_pending_batches.front()); - _pending_batches.pop(); - _pending_batches_num--; + if (!thread_pool || !thread_pool->submit_func([this]() { + this->try_send_batch(); + }).ok()) { + try_send_batch(); } + } - auto row_batch = std::move(send_batch.first); - auto request = std::move(send_batch.second); // doesn't need to be saved in heap + return _send_finished ? 0 : 1; +} - // tablet_ids has already set when add row - request.set_packet_seq(_next_packet_seq); - if (row_batch->num_rows() > 0) { - SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); - row_batch->serialize(request.mutable_row_batch()); - } +void NodeChannel::try_send_batch() { + SCOPED_ATOMIC_TIMER(&_actual_consume_ns); + AddBatchReq send_batch; + { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + std::lock_guard l(_pending_batches_lock); + DCHECK(!_pending_batches.empty()); + send_batch = std::move(_pending_batches.front()); + _pending_batches.pop(); + _pending_batches_num--; + } - _add_batch_closure->reset(); - int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; - if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) { - if (remain_ms <= 0 && !request.eos()) { - cancel(); - return 0; - } else { - remain_ms = _min_rpc_timeout_ms; - } - } - _add_batch_closure->cntl.set_timeout_ms(remain_ms); - if (config::tablet_writer_ignore_eovercrowded) { - _add_batch_closure->cntl.ignore_eovercrowded(); - } + auto row_batch = std::move(send_batch.first); + auto request = std::move(send_batch.second); // doesn't need to be saved in heap - if (request.eos()) { - for (auto pid : _parent->_partition_ids) { - request.add_partition_ids(pid); - } + // tablet_ids has already set when add row + request.set_packet_seq(_next_packet_seq); + if (row_batch->num_rows() > 0) { + SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); + row_batch->serialize(request.mutable_row_batch()); + } - // eos request must be the last request - _add_batch_closure->end_mark(); - _send_finished = true; - DCHECK(_pending_batches_num == 0); + _add_batch_closure->reset(); + int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; + if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) { + if (remain_ms <= 0 && !request.eos()) { + cancel(); + } else { + remain_ms = _min_rpc_timeout_ms; } + } + _add_batch_closure->cntl.set_timeout_ms(remain_ms); + if (config::tablet_writer_ignore_eovercrowded) { + _add_batch_closure->cntl.ignore_eovercrowded(); + } - _add_batch_closure->set_in_flight(); - _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, - &_add_batch_closure->result, _add_batch_closure); + if (request.eos()) { + for (auto pid : _parent->_partition_ids) { + request.add_partition_ids(pid); + } - _next_packet_seq++; + // eos request must be the last request + _add_batch_closure->end_mark(); + _send_finished = true; + DCHECK(_pending_batches_num == 0); } - return _send_finished ? 0 : 1; + _add_batch_closure->set_in_flight(); + _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, + &_add_batch_closure->result, _add_batch_closure); + + _next_packet_seq++; } Status NodeChannel::none_of(std::initializer_list vars) { @@ -528,6 +535,9 @@ Status OlapTableSink::init(const TDataSink& t_sink) { } else { _load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; } + if (table_sink.__isset.send_batch_parallelism && table_sink.send_batch_parallelism > 1) { + _send_batch_parallelism = table_sink.send_batch_parallelism; + } return Status::OK(); } @@ -815,10 +825,11 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; ss << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", node add batch time(ms)/num: "; + << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait execution time(ms)/num: "; for (auto const& pair : node_add_batch_counter_map) { ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) - << ")(" << pair.second.add_batch_num << ")} "; + << ")(" << (pair.second.add_batch_wait_execution_time_us / 1000) << ")(" + << pair.second.add_batch_num << ")} "; } LOG(INFO) << ss.str(); } else { @@ -1003,11 +1014,28 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* void OlapTableSink::_send_batch_process() { SCOPED_TIMER(_non_blocking_send_timer); + std::unique_ptr thread_pool = nullptr; + if (_send_batch_parallelism > 1) { + int send_batch_pool_queue_size = 0; + for (auto index_channel : _channels) { + send_batch_pool_queue_size += index_channel->num_node_channels(); + } + int32_t send_batch_parallelism = _send_batch_parallelism <= config::max_send_batch_parallelism ? + _send_batch_parallelism : config::max_send_batch_parallelism; + auto s = ThreadPoolBuilder("SendBatchThreadPool") + .set_min_threads(send_batch_parallelism) + .set_max_threads(send_batch_parallelism) + .set_max_queue_size(send_batch_pool_queue_size) + .build(&thread_pool); + if (!s.ok()) { + thread_pool.reset(); + } + } do { int running_channels_num = 0; for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&running_channels_num](NodeChannel* ch) { - running_channels_num += ch->try_send_and_fetch_status(); + index_channel->for_each_node_channel([&running_channels_num, &thread_pool](NodeChannel* ch) { + running_channels_num += ch->try_send_and_fetch_status(thread_pool); }); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 7f0dcb54cab278..4d1746a83593b6 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -44,6 +44,7 @@ class Bitmap; class MemTracker; class RuntimeProfile; class RowDescriptor; +class ThreadPool; class Tuple; class TupleDescriptor; class ExprContext; @@ -58,12 +59,12 @@ struct AddBatchCounter { // total execution time of a add_batch rpc int64_t add_batch_execution_time_us = 0; // lock waiting time in a add_batch rpc - int64_t add_batch_wait_lock_time_us = 0; + int64_t add_batch_wait_execution_time_us = 0; // number of add_batch call int64_t add_batch_num = 0; AddBatchCounter& operator+=(const AddBatchCounter& rhs) { add_batch_execution_time_us += rhs.add_batch_execution_time_us; - add_batch_wait_lock_time_us += rhs.add_batch_wait_lock_time_us; + add_batch_wait_execution_time_us += rhs.add_batch_wait_execution_time_us; add_batch_num += rhs.add_batch_num; return *this; } @@ -169,7 +170,9 @@ class NodeChannel { // 1: running, haven't reach eos. // only allow 1 rpc in flight // plz make sure, this func should be called after open_wait(). - int try_send_and_fetch_status(); + int try_send_and_fetch_status(std::unique_ptr& thread_pool); + + void try_send_batch(); void time_report(std::unordered_map* add_batch_counter_map, int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns, @@ -410,10 +413,11 @@ class OlapTableSink : public DataSink { // the timeout of load channels opened by this tablet sink. in second int64_t _load_channel_timeout_s = 0; - // True if this sink has been closed once - bool _is_closed = false; - // Save the status of close() method - Status _close_status; + int32_t _send_batch_parallelism = 1; + // True if this sink has been closed once + bool _is_closed = false; + // Save the status of close() method + Status _close_status; }; } // namespace stream_load diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index ac409b100cb6ea..2177d12f9f697d 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -237,8 +237,10 @@ int StreamLoadAction::on_header(HttpRequest* req) { HttpChannel::send_reply(req, str); streaming_load_current_processing->increment(-1); #ifndef BE_TEST - str = ctx->prepare_stream_load_record(str); - _sava_stream_load_record(ctx, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _sava_stream_load_record(ctx, str); + } #endif return -1; } @@ -485,6 +487,14 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL)); } + if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) { + try { + request.__set_send_batch_parallelism(std::stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM))); + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid send_batch_parallelism format"); + } + } + if (ctx->timeout_second != -1) { request.__set_timeout(ctx->timeout_second); } diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 50cfbae7e65df4..2bed1d5de0243c 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -50,6 +50,8 @@ static const std::string HTTP_FUNCTION_COLUMN = "function_column"; static const std::string HTTP_SEQUENCE_COL = "sequence_col"; static const std::string HTTP_COMPRESS_TYPE = "compress_type"; -static const std::string HTTP_100_CONTINUE = "100-continue"; +static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism"; + + static const std::string HTTP_100_CONTINUE = "100-continue"; } // namespace doris diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index fb78a5503a9801..d257ab6450a983 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -21,11 +21,13 @@ #include "olap/memtable.h" #include "util/scoped_cleanup.h" +#include "util/time.h" namespace doris { std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { - os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000 + os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS + << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS << ", flush count=" << stat.flush_count << ", flush bytes: " << stat.flush_size_bytes << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")"; @@ -39,7 +41,8 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { // its reference count is not 0. OLAPStatus FlushToken::submit(const std::shared_ptr& memtable) { RETURN_NOT_OK(_flush_status.load()); - _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable)); + int64_t submit_task_time = MonotonicNanos(); + _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable, submit_task_time)); return OLAP_SUCCESS; } @@ -52,9 +55,9 @@ OLAPStatus FlushToken::wait() { return _flush_status.load(); } -void FlushToken::_flush_memtable(std::shared_ptr memtable) { +void FlushToken::_flush_memtable(std::shared_ptr memtable, int64_t submit_task_time) { + _stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time); SCOPED_CLEANUP({ memtable.reset(); }); - // If previous flush has failed, return directly if (_flush_status.load() != OLAP_SUCCESS) { return; diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 4b6795b77d8051..8b81bde524b3d8 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -35,10 +35,11 @@ class MemTable; // the statistic of a certain flush handler. // use atomic because it may be updated by multi threads struct FlushStatistic { - int64_t flush_time_ns = 0; - int64_t flush_count = 0; - int64_t flush_size_bytes = 0; - int64_t flush_disk_size_bytes = 0; + std::atomic_uint64_t flush_time_ns = 0; + std::atomic_uint64_t flush_count = 0; + std::atomic_uint64_t flush_size_bytes = 0; + std::atomic_uint64_t flush_disk_size_bytes = 0; + std::atomic_uint64_t flush_wait_time_ns = 0; }; std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); @@ -68,7 +69,7 @@ class FlushToken { const FlushStatistic& get_stats() const { return _stats; } private: - void _flush_memtable(std::shared_ptr mem_table); + void _flush_memtable(std::shared_ptr mem_table, int64_t submit_task_time); std::unique_ptr _flush_token; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index f4bdb496184fe1..adad26bc200307 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -110,7 +110,9 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr // add batch maybe cost a lot of time, and this callback thread will be held. // this will influence query execution, because the pthreads under bthread may be // exhausted, so we put this to a local thread pool to process - _tablet_worker_pool.offer([request, response, done, this]() { + int64_t submit_task_time_ns = MonotonicNanos(); + _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, this]() { + int64_t wait_execution_time_us = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; { @@ -124,7 +126,8 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr } st.to_protobuf(response->mutable_status()); } - response->set_execution_time_us(execution_time_ns / 1000); + response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); + response->set_wait_execution_time_us(execution_time_ns / NANOS_PER_MICRO); }); } diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 40da5db0b4b616..84f1f4330506ae 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -789,6 +789,12 @@ Default:100 Max number of txns for every txn_partition_map in txn manager, this is a self protection to avoid too many txns saving in manager +### `max_send_batch_parallelism` + +* Type: int +* Description: Max send batch parallelism for OlapTableSink. The value set by the user for `send_batch_parallelism` is not allowed to exceed `max_send_batch_parallelism`, if exceed, the value of `send_batch_parallelism` would be `max_send_batch_parallelism`. +* Default value: 1 + ### `max_tablet_num_per_shard` Default:1024 diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 4334e285a6dbdd..054a104afea7cf 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -345,6 +345,10 @@ Translated with www.DeepL.com/Translator (free version) Not used. +* `send_batch_parallelism` + + Used to set the default parallelism for sending batch when execute InsertStmt operation, if the value for parallelism exceed `max_send_batch_parallelism` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. + * `sql_mode` Used to specify SQL mode to accommodate certain SQL dialects. For the SQL mode, see [here](./sql-mode.md). diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 092a88a361d375..37f422bc8bbb5e 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -253,6 +253,8 @@ under the License. timezone: Specify time zones for functions affected by time zones, such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation for details. If not specified, use the "Asia/Shanghai" time zone. + send_batch_parallelism: Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. + 5. Load data format sample Integer(TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234 diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index ee0e775ec34d84..abe3af8e417c3f 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -196,6 +196,9 @@ FROM data_source 9. `json_root` json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "". + 10. `send_batch_parallelism` + Integer, Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. + 6. data_source The type of data source. Current support: diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index a91da8346d55d5..bdc7844adc1250 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -142,6 +142,8 @@ The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND `read_json_by_line`: Boolean type, true means that one json object can be read per line, and the default value is false. +`send_batch_parallelism`: Integer type, used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. + RETURN VALUES After the load is completed, the related content of this load will be returned in Json format. Current field included diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 8e637fa9c7d6f8..be7febefd462a1 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -792,6 +792,12 @@ cumulative compaction策略:最大增量文件的数量 txn 管理器中每个 txn_partition_map 的最大 txns 数,这是一种自我保护,以避免在管理器中保存过多的 txns +### `max_send_batch_parallelism` + +* 类型:int +* 描述:OlapTableSink 发送批处理数据的最大并行度,用户为 `send_batch_parallelism` 设置的值不允许超过 `max_send_batch_parallelism` ,如果超过, `send_batch_parallelism` 将被设置为 `max_send_batch_parallelism` 的值。 +* 默认值:1 + ### `max_tablet_num_per_shard` 默认:1024 diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index 423e86f92f2555..84fcadeb160206 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -339,7 +339,10 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3); * `resource_group` 暂不使用。 - +* `send_batch_parallelism` + + 用于设置执行 InsertStmt 操作时发送批处理数据的默认并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism` 的值。 + * `sql_mode` 用于指定 SQL 模式,以适应某些 SQL 方言。关于 SQL 模式,可参阅 [这里](./sql-mode.md)。 diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index b9edbfb807bff7..105d48e04dad8c 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -138,30 +138,30 @@ under the License. 用于指定一些特殊参数。 语法: [PROPERTIES ("key"="value", ...)] - + 可以指定如下参数: - - line_delimiter: 用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。 - fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json格式。 - - jsonpaths: 导入json方式分为:简单模式和匹配模式。 - 简单模式:没有设置jsonpaths参数即为简单模式,这种模式下要求json数据是对象类型,例如: - {"k1":1, "k2":2, "k3":"hello"},其中k1,k2,k3是列名字。 - 匹配模式:用于json数据相对复杂,需要通过jsonpaths参数匹配对应的value。 - - strip_outer_array: 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。例如: - [ - {"k1" : 1, "v1" : 2}, - {"k1" : 3, "v1" : 4} - ] - 当strip_outer_array为true,最后导入到doris中会生成两行数据。 - - json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 - - num_as_string: 布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。 - - 3. broker_name + line_delimiter: 用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。 + + fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json格式。 + + jsonpaths: 导入json方式分为:简单模式和匹配模式。 + 简单模式:没有设置jsonpaths参数即为简单模式,这种模式下要求json数据是对象类型,例如: + {"k1":1, "k2":2, "k3":"hello"},其中k1,k2,k3是列名字。 + 匹配模式:用于json数据相对复杂,需要通过jsonpaths参数匹配对应的value。 + + strip_outer_array: 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。例如: + [ + {"k1" : 1, "v1" : 2}, + {"k1" : 3, "v1" : 4} + ] + 当strip_outer_array为true,最后导入到doris中会生成两行数据。 + + json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 + + num_as_string: 布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。 + + 3. broker_name 所使用的 broker 名称,可以通过 show broker 命令查看。 @@ -246,6 +246,7 @@ under the License. exec_mem_limit: 导入内存限制。默认为 2GB。单位为字节。 strict mode: 是否对数据进行严格限制。默认为 false。 timezone: 指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 [时区] 文档。如果不指定,则使用 "Asia/Shanghai" 时区。 + send_batch_parallelism: 用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism` 的值。 5. 导入数据格式样例 @@ -537,7 +538,7 @@ under the License. ) with BROKER "hdfs" ("username"="user", "password"="pass"); - 14. 先过滤原始数据,在进行列的映射、转换和过滤操作 + 15. 先过滤原始数据,在进行列的映射、转换和过滤操作 LOAD LABEL example_db.label_filter ( @@ -550,8 +551,8 @@ under the License. WHERE k1 > 3 ) with BROKER "hdfs" ("username"="user", "password"="pass"); - - 15. 导入json文件中数据 指定FORMAT为json, 默认是通过文件后缀判断,设置读取数据的参数 + + 16. 导入json文件中数据 指定FORMAT为json, 默认是通过文件后缀判断,设置读取数据的参数 LOAD LABEL example_db.label9 ( @@ -563,7 +564,7 @@ under the License. ) WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); - 16. LOAD WITH HDFS, 普通HDFS集群 + 17. LOAD WITH HDFS, 普通HDFS集群 LOAD LABEL example_db.label_filter ( DATA INFILE("hdfs://host:port/user/data/*/test.txt") @@ -575,7 +576,7 @@ under the License. "fs.defaultFS"="hdfs://testFs", "hdfs_user"="user" ); - 17. LOAD WITH HDFS, 带ha的HDFS集群 + 17. LOAD WITH HDFS, 带ha的HDFS集群 LOAD LABEL example_db.label_filter ( DATA INFILE("hdfs://host:port/user/data/*/test.txt") @@ -595,4 +596,4 @@ under the License. ## keyword - BROKER,LOAD + BROKER,LOAD \ No newline at end of file diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 4200ad4b04bdc8..dffe666043e154 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -179,7 +179,11 @@ under the License. 9. json_root - json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 + json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 + + 10. send_batch_parallelism + + 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism` 的值。 6. data_source diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 0587e6f243e959..38aee0c267b2ea 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -106,6 +106,8 @@ under the License. num_as_string: 布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。 read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。 + + send_batch_parallelism: 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism` 的值。 RETURN VALUES 导入完成后,会以Json格式返回这次导入的相关内容。当前包括以下字段 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index ffd782a5975357..67f9beb094a3e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -116,6 +116,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private static final String NAME_TYPE = "ROUTINE LOAD NAME"; public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; + public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) @@ -132,6 +133,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) .add(EXEC_MEM_LIMIT_PROPERTY) + .add(SEND_BATCH_PARALLELISM) .build(); private final LabelName labelName; @@ -154,6 +156,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private boolean strictMode = true; private long execMemLimit = 2 * 1024 * 1024 * 1024L; private String timezone = TimeUtils.DEFAULT_TIME_ZONE; + private int sendBatchParallelism = 1; /** * RoutineLoad support json data. * Require Params: @@ -175,6 +178,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final Predicate MAX_BATCH_ROWS_PRED = (v) -> v >= 200000; public static final Predicate MAX_BATCH_SIZE_PRED = (v) -> v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; public static final Predicate EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L; + public static final Predicate SEND_BATCH_PARALLELISM_PRED = (v) -> v > 0L; public CreateRoutineLoadStmt(LabelName labelName, String tableName, List loadPropertyList, Map jobProperties, String typeName, @@ -232,6 +236,10 @@ public long getExecMemLimit() { return execMemLimit; } + public int getSendBatchParallelism() { + return sendBatchParallelism; + } + public boolean isStrictMode() { return strictMode; } @@ -434,6 +442,11 @@ private void checkJobProperties() throws UserException { LoadStmt.STRICT_MODE + " should be a boolean"); execMemLimit = Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY), RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, EXEC_MEM_LIMIT_PROPERTY + "should > 0"); + + sendBatchParallelism = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM), + RoutineLoadJob.DEFAULT_SEND_BATCH_PARALLELISM, SEND_BATCH_PARALLELISM_PRED, + SEND_BATCH_PARALLELISM + " should > 0")).intValue(); + if (ConnectContext.get() != null) { timezone = ConnectContext.get().getSessionVariable().getTimeZone(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 7e31600d66afbe..64e3525898b7ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -323,7 +323,8 @@ public void analyze(Analyzer analyzer) throws UserException { if (!isExplain() && targetTable instanceof OlapTable) { OlapTableSink sink = (OlapTableSink) dataSink; TUniqueId loadId = analyzer.getContext().queryId(); - sink.init(loadId, transactionId, db.getId(), timeoutSecond); + int sendBatchParallelism = analyzer.getContext().getSessionVariable().getSendBatchParallelism(); + sink.init(loadId, transactionId, db.getId(), timeoutSecond, sendBatchParallelism); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index efcfe5b894a3b9..368d6c7c8a0de9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -81,6 +81,7 @@ public class LoadStmt extends DdlStmt { public static final String STRICT_MODE = "strict_mode"; public static final String TIMEZONE = "timezone"; public static final String LOAD_PARALLELISM = "load_parallelism"; + public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; // for load data from Baidu Object Store(BOS) public static final String BOS_ENDPOINT = "bos_endpoint"; @@ -114,7 +115,6 @@ public class LoadStmt extends DdlStmt { public static final String KEY_IN_PARAM_FUNCTION_COLUMN = "function_column"; public static final String KEY_IN_PARAM_SEQUENCE_COL = "sequence_col"; public static final String KEY_IN_PARAM_BACKEND_ID = "backend_id"; - private final LabelName label; private final List dataDescriptions; private final BrokerDesc brokerDesc; @@ -162,6 +162,12 @@ public class LoadStmt extends DdlStmt { return Integer.valueOf(s); } }) + .put(SEND_BATCH_PARALLELISM, new Function() { + @Override + public @Nullable Integer apply(@Nullable String s) { + return Integer.valueOf(s); + } + }) .put(CLUSTER_PROPERTY, new Function() { @Override public @Nullable String apply(@Nullable String s) { @@ -289,6 +295,19 @@ public static void checkProperties(Map properties) throws DdlExc properties.put(TIMEZONE, TimeUtils.checkTimeZoneValidAndStandardize( properties.getOrDefault(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE))); } + + // send batch parallelism + final String sendBatchParallelism = properties.get(SEND_BATCH_PARALLELISM); + if (sendBatchParallelism != null) { + try { + final int sendBatchParallelismValue = Integer.valueOf(sendBatchParallelism); + if (sendBatchParallelismValue < 1) { + throw new DdlException(SEND_BATCH_PARALLELISM + " must be greater than 0"); + } + } catch (NumberFormatException e) { + throw new DdlException(SEND_BATCH_PARALLELISM + " is not a number."); + } + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index e001b87d51fe9c..93f60ac8be23f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -199,7 +199,7 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, brokerFileGroups, getDeadlineMs(), getExecMemLimit(), isStrictMode(), transactionId, this, getTimeZone(), getTimeout(), - getLoadParallelism(), isReportSuccess ? jobProfile : null); + getLoadParallelism(), getSendBatchParallelism(), isReportSuccess ? jobProfile : null); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); task.init(loadId, attachment.getFileStatusByTable(aggKey), diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index e83094ea687799..d41f7b3437ee56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -383,6 +383,7 @@ private void initDefaultJobProperties() { jobProperties.put(LoadStmt.STRICT_MODE, false); jobProperties.put(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE); jobProperties.put(LoadStmt.LOAD_PARALLELISM, Config.default_load_parallelism); + jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 1); } public void isJobTypeRead(boolean jobTypeRead) { @@ -1193,6 +1194,10 @@ public int getLoadParallelism() { return (int) jobProperties.get(LoadStmt.LOAD_PARALLELISM); } + public int getSendBatchParallelism() { + return (int) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM); + } + // Return true if this job is finished for a long time public boolean isExpired(long currentTimeMs) { if (!isCompleted()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 7510e61fc83e05..243e95a9a4b529 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -64,6 +64,7 @@ public class LoadLoadingTask extends LoadTask { // timeout of load job, in seconds private final long timeoutS; private final int loadParallelism; + private final int sendBatchParallelism; private LoadingTaskPlanner planner; @@ -74,7 +75,7 @@ public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, long txnId, LoadTaskCallback callback, String timezone, - long timeoutS, int loadParallelism, RuntimeProfile profile) { + long timeoutS, int loadParallelism, int sendBatchParallelism, RuntimeProfile profile) { super(callback, TaskType.LOADING); this.db = db; this.table = table; @@ -89,13 +90,14 @@ public LoadLoadingTask(Database db, OlapTable table, this.timezone = timezone; this.timeoutS = timeoutS; this.loadParallelism = loadParallelism; + this.sendBatchParallelism = sendBatchParallelism; this.jobProfile = profile; } public void init(TUniqueId loadId, List> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException { this.loadId = loadId; planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, - brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, this.loadParallelism, userInfo); + brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism, userInfo); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 4d2dbe434d95c1..b490dcaebfb5d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -67,6 +67,7 @@ public class LoadingTaskPlanner { private final boolean strictMode; private final long timeoutS; // timeout of load job, in second private final int loadParallelism; + private final int sendBatchParallelism; private UserIdentity userInfo; // Something useful // ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase() @@ -82,7 +83,7 @@ public class LoadingTaskPlanner { public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, boolean strictMode, String timezone, long timeoutS, int loadParallelism, - UserIdentity userInfo) { + int sendBatchParallelism, UserIdentity userInfo) { this.loadJobId = loadJobId; this.txnId = txnId; this.dbId = dbId; @@ -93,6 +94,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table this.analyzer.setTimezone(timezone); this.timeoutS = timeoutS; this.loadParallelism = loadParallelism; + this.sendBatchParallelism = sendBatchParallelism; this.userInfo = userInfo; if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(userInfo, Catalog.getCurrentCatalog().getDb(dbId).getFullName(), PrivPredicate.SELECT)) { @@ -131,7 +133,7 @@ public void plan(TUniqueId loadId, List> fileStatusesLis // 2. Olap table sink List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds); - olapTableSink.init(loadId, txnId, dbId, timeoutS); + olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism); olapTableSink.complete(); // 3. Plan fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index a758d65bb38d78..25563d29abd80c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -109,6 +109,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public static final long DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB public static final long DEFAULT_EXEC_MEM_LIMIT = 2 * 1024 * 1024 * 1024L; public static final boolean DEFAULT_STRICT_MODE = false; // default is false + public static final int DEFAULT_SEND_BATCH_PARALLELISM = 1; protected static final String STAR_STRING = "*"; /* @@ -168,6 +169,7 @@ public boolean isFinalState() { // if current error rate is more than max error rate, the job will be paused protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional protected long execMemLimit = DEFAULT_EXEC_MEM_LIMIT; + protected int sendBatchParallelism = DEFAULT_SEND_BATCH_PARALLELISM; // include strict mode protected Map jobProperties = Maps.newHashMap(); @@ -284,8 +286,14 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (stmt.getExecMemLimit() != -1) { this.execMemLimit = stmt.getExecMemLimit(); } + if (stmt.getSendBatchParallelism() > 0) { + this.sendBatchParallelism = stmt.getSendBatchParallelism(); + } jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone()); jobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(stmt.isStrictMode())); + jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(stmt.getExecMemLimit())); + jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, String.valueOf(stmt.getSendBatchParallelism())); + if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) { jobProperties.put(PROPS_FORMAT, "csv"); jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false"); @@ -555,6 +563,11 @@ public boolean isFuzzyParse() { return Boolean.valueOf(jobProperties.get(PROPS_FUZZY_PARSE)); } + @Override + public int getSendBatchParallelism() { + return sendBatchParallelism; + } + @Override public boolean isReadJsonByLine() { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java index 0ffe906d2be630..6fed2cd1c82ec2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java @@ -90,7 +90,8 @@ public void plan(long txnId) throws UserException { // 2. gen olap table sink OlapTableSink olapTableSink = new OlapTableSink(targetTable, computeTargetTupleDesc(), null); olapTableSink.init(analyzer.getContext().queryId(), txnId, targetDBId, - analyzer.getContext().getSessionVariable().queryTimeoutS); + analyzer.getContext().getSessionVariable().queryTimeoutS, + analyzer.getContext().getSessionVariable().sendBatchParallelism); olapTableSink.complete(); // 3. gen plan fragment PlanFragment planFragment = new PlanFragment(fragmentIdGenerator_.getNextId(), olapScanNode, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 32683743b753bb..420c3997589441 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -93,12 +93,13 @@ public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds); - olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout); + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), taskInfo.getTimeout(), taskInfo.getSendBatchParallelism()); olapTableSink.complete(); // for stream load, we only need one fragment, ScanNode -> DataSink. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 0e15a319690c9c..6e7eb8380f9772 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -142,6 +142,13 @@ public class SessionVariable implements Serializable, Writable { public static final String DELETE_WITHOUT_PARTITION = "delete_without_partition"; + public static final String EXTRACT_WIDE_RANGE_EXPR = "extract_wide_range_expr"; + + // set the default parallelism for send batch when execute InsertStmt operation, + // if the value for parallelism exceed `max_send_batch_parallelism` in BE config, + // then the coordinator be will use the value of `max_send_batch_parallelism` + public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; + public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000; public static final String EXTRACT_WIDE_RANGE_EXPR = "extract_wide_range_expr"; @@ -341,6 +348,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DELETE_WITHOUT_PARTITION, needForward = true) public boolean deleteWithoutPartition = false; + @VariableMgr.VarAttr(name = SEND_BATCH_PARALLELISM, needForward = true) + public int sendBatchParallelism = 1; + @VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true) public boolean extractWideRangeExpr = true; @VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE) @@ -758,7 +768,7 @@ public void clearSessionOriginValue() { public boolean isDeleteWithoutPartition() { return deleteWithoutPartition; } - + public boolean isExtractWideRangeExpr() { return extractWideRangeExpr; } @@ -767,6 +777,10 @@ public int getCpuResourceLimit() { return cpuResourceLimit; } + public int getSendBatchParallelism() { + return sendBatchParallelism; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 3bf88c1cac7e1b..6deae19c211800 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -57,6 +57,7 @@ public interface LoadTaskInfo { public Expr getWhereExpr(); public Separator getColumnSeparator(); public Separator getLineDelimiter(); + public int getSendBatchParallelism(); public static class ImportColumnDescs { public List descs = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index f6723b38a9bc5a..b47b91a1b6abdb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -73,6 +73,7 @@ public class StreamLoadTask implements LoadTaskInfo { private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete private Expr deleteCondition; private String sequenceCol; + private int sendBatchParallelism = 1; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { this.id = id; @@ -123,6 +124,11 @@ public Separator getLineDelimiter() { return lineDelimiter; } + @Override + public int getSendBatchParallelism() { + return sendBatchParallelism; + } + public PartitionNames getPartitions() { return partitions; } @@ -288,6 +294,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws if (request.isSetSequenceCol()) { sequenceCol = request.getSequenceCol(); } + if (request.isSetSendBatchParallelism()) { + sendBatchParallelism = request.getSendBatchParallelism(); + } } // used for stream load diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 894ed2f25bccbd..5ef4dcba4afda8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -361,8 +361,8 @@ public void testPendingTaskOnFinishedWithUserInfo(@Mocked BrokerPendingTaskAttac TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); RuntimeProfile jobProfile = new RuntimeProfile("test"); LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups, - 100, 100, false, 100, callback, "", 100, 1, - jobProfile); + 100, 100, false, 100, callback, "", + 100, 1, 1, jobProfile); try { UserIdentity userInfo = new UserIdentity("root", "localhost"); userInfo.setIsAnalyzed(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index 0e4e764c8ba0a8..d0e4daf89563ae 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -101,7 +101,7 @@ public void testSinglePartition() throws UserException { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L)); - sink.init(new TUniqueId(1, 2), 3, 4, 1000); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1); sink.complete(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -131,7 +131,7 @@ public void testRangePartition( }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId())); - sink.init(new TUniqueId(1, 2), 3, 4, 1000); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1); try { sink.complete(); } catch (UserException e) { @@ -153,7 +153,7 @@ public void testRangeUnknownPartition( }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(unknownPartId)); - sink.init(new TUniqueId(1, 2), 3, 4, 1000); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1); sink.complete(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -183,7 +183,7 @@ public void testListPartition( }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId())); - sink.init(new TUniqueId(1, 2), 3, 4, 1000); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1); try { sink.complete(); } catch (UserException e) { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index b3b93fb305537e..0b81b492bf884c 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -101,6 +101,7 @@ message PTabletWriterAddBatchResult { repeated PTabletInfo tablet_vec = 2; optional int64 execution_time_us = 3; optional int64 wait_lock_time_us = 4; + optional int64 wait_execution_time_us = 5; }; // tablet writer cancel diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 3aa1d106b92a04..44ac3e023aa0be 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -134,6 +134,7 @@ struct TOlapTableSink { 12: required Descriptors.TOlapTableLocationParam location 13: required Descriptors.TPaloNodesInfo nodes_info 14: optional i64 load_channel_timeout_s // the timeout of load channels in second + 15: optional i32 send_batch_parallelism; } struct TDataSink { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 78e4ea3041c9f9..efacdd7c7a08ee 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -591,6 +591,7 @@ struct TStreamLoadPutRequest { 32: optional string line_delimiter 33: optional bool read_json_by_line 34: optional string auth_code_uuid + 35: optional i32 send_batch_parallelism; } struct TStreamLoadPutResult { From c36670d5c8d55a49dfee85b0efa467c04f42aac0 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Mon, 9 Aug 2021 10:45:45 +0800 Subject: [PATCH 02/11] fix --- be/src/exec/tablet_sink.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 44883ed27a0d31..4a9a927515689f 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -1015,13 +1015,12 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* void OlapTableSink::_send_batch_process() { SCOPED_TIMER(_non_blocking_send_timer); std::unique_ptr thread_pool = nullptr; - if (_send_batch_parallelism > 1) { + int32_t send_batch_parallelism = MIN(_send_batch_parallelism, config::max_send_batch_parallelism); + if (send_batch_parallelism > 1) { int send_batch_pool_queue_size = 0; for (auto index_channel : _channels) { send_batch_pool_queue_size += index_channel->num_node_channels(); } - int32_t send_batch_parallelism = _send_batch_parallelism <= config::max_send_batch_parallelism ? - _send_batch_parallelism : config::max_send_batch_parallelism; auto s = ThreadPoolBuilder("SendBatchThreadPool") .set_min_threads(send_batch_parallelism) .set_max_threads(send_batch_parallelism) From b95faf6930a9f6674cdf8d3747a8b916aac6affa Mon Sep 17 00:00:00 2001 From: caiconghui Date: Mon, 23 Aug 2021 18:51:13 +0800 Subject: [PATCH 03/11] fix bug for race condition --- be/src/exec/tablet_sink.cpp | 6 ++++-- be/src/exec/tablet_sink.h | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 4a9a927515689f..d8ed938a023eba 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -347,8 +347,9 @@ int NodeChannel::try_send_and_fetch_status(std::unique_ptr& thread_p if (!st.ok()) { return 0; } - - if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { + bool is_finished = true; + if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 && + _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) { if (!thread_pool || !thread_pool->submit_func([this]() { this->try_send_batch(); }).ok()) { @@ -411,6 +412,7 @@ void NodeChannel::try_send_batch() { &_add_batch_closure->result, _add_batch_closure); _next_packet_seq++; + _last_patch_processed_finished = true; } Status NodeChannel::none_of(std::initializer_list vars) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 4d1746a83593b6..15785e9515e33a 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -230,6 +230,8 @@ class NodeChannel { // add batches finished means the last rpc has be response, used to check whether this channel can be closed std::atomic _add_batches_finished{false}; + std::atomic _last_patch_processed_finished{true}; + bool _eos_is_produced{false}; // only for restricting producer behaviors std::unique_ptr _row_desc; From f167762f559ddeef5fe5e646a3395c08044265fe Mon Sep 17 00:00:00 2001 From: caiconghui Date: Mon, 23 Aug 2021 19:41:26 +0800 Subject: [PATCH 04/11] fix --- .../sql-statements/Data Manipulation/BROKER LOAD.md | 10 +++++----- .../sql-statements/Data Manipulation/BROKER LOAD.md | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 37f422bc8bbb5e..f51fb1beee4f5d 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -501,7 +501,7 @@ under the License. ) WITH BROKER "hdfs" ("username"="user", "password"="pass"); - 13. Load a batch of data from HDFS, specify timeout and filtering ratio. Use the broker with the plaintext ugi my_hdfs_broker. Simple authentication. delete the data when v2 >100, other append + 12. Load a batch of data from HDFS, specify timeout and filtering ratio. Use the broker with the plaintext ugi my_hdfs_broker. Simple authentication. delete the data when v2 >100, other append LOAD LABEL example_db.label1 ( @@ -522,7 +522,7 @@ under the License. "max_filter_ratio" = "0.1" ); - 14. Filter the original data first, and perform column mapping, conversion and filtering operations + 13. Filter the original data first, and perform column mapping, conversion and filtering operations LOAD LABEL example_db.label_filter ( @@ -536,7 +536,7 @@ under the License. ) with BROKER "hdfs" ("username"="user", "password"="pass"); - 15. Import the data in the json file, and specify format as json, it is judged by the file suffix by default, set parameters for reading data + 14. Import the data in the json file, and specify format as json, it is judged by the file suffix by default, set parameters for reading data LOAD LABEL example_db.label9 ( @@ -548,7 +548,7 @@ under the License. ) WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); - 16. LOAD WITH HDFS, normal HDFS cluster + 15. LOAD WITH HDFS, normal HDFS cluster LOAD LABEL example_db.label_filter ( DATA INFILE("hdfs://host:port/user/data/*/test.txt") @@ -560,7 +560,7 @@ under the License. "fs.defaultFS"="hdfs://testFs", "hdfs_user"="user" ); - 17. LOAD WITH HDFS, hdfs ha + 16. LOAD WITH HDFS, hdfs ha LOAD LABEL example_db.label_filter ( DATA INFILE("hdfs://host:port/user/data/*/test.txt") diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 105d48e04dad8c..ae8cf1a983cbce 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -576,7 +576,7 @@ under the License. "fs.defaultFS"="hdfs://testFs", "hdfs_user"="user" ); - 17. LOAD WITH HDFS, 带ha的HDFS集群 + 18. LOAD WITH HDFS, 带ha的HDFS集群 LOAD LABEL example_db.label_filter ( DATA INFILE("hdfs://host:port/user/data/*/test.txt") From ae3727bdbd762c6ef21dcdc223600f0cdbb554ec Mon Sep 17 00:00:00 2001 From: caiconghui Date: Wed, 25 Aug 2021 20:41:03 +0800 Subject: [PATCH 05/11] add thread pool to limit all send batch task --- be/src/common/config.h | 5 +++ be/src/exec/tablet_sink.cpp | 34 +++++-------------- be/src/exec/tablet_sink.h | 4 ++- be/src/runtime/exec_env.h | 2 ++ be/src/runtime/exec_env_init.cpp | 18 ++++++++++ be/src/util/doris_metrics.h | 2 ++ be/src/util/threadpool.h | 5 +++ be/test/exec/tablet_sink_test.cpp | 6 +++- .../org/apache/doris/qe/SessionVariable.java | 2 -- 9 files changed, 49 insertions(+), 29 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index aa818211551171..13408091c73b17 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -627,6 +627,11 @@ CONF_mBool(runtime_filter_use_async_rpc, "true"); CONF_mInt32(max_send_batch_parallelism, "1"); CONF_Validator(max_send_batch_parallelism, [](const int config) -> bool { return config >= 1; }); +// number of send batch thread pool size +CONF_Int32(doris_send_batch_thread_pool_thread_num, "256"); +// number of send batch thread pool queue size +CONF_Int32(doris_send_batch_thread_pool_queue_size, "102400"); + } // namespace config } // namespace doris diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index d8ed938a023eba..48e51107ad6042 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -342,7 +342,7 @@ void NodeChannel::cancel() { request.release_id(); } -int NodeChannel::try_send_and_fetch_status(std::unique_ptr& thread_pool) { +int NodeChannel::try_send_and_fetch_status(std::unique_ptr& thread_pool_token) { auto st = none_of({_cancelled, _send_finished}); if (!st.ok()) { return 0; @@ -350,13 +350,11 @@ int NodeChannel::try_send_and_fetch_status(std::unique_ptr& thread_p bool is_finished = true; if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 && _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) { - if (!thread_pool || !thread_pool->submit_func([this]() { - this->try_send_batch(); - }).ok()) { - try_send_batch(); + auto s = thread_pool_token->submit_func(std::bind(&NodeChannel::try_send_batch, this)); + if (!s.ok()) { + _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); } } - return _send_finished ? 0 : 1; } @@ -687,7 +685,9 @@ Status OlapTableSink::open(RuntimeState* state) { return Status::InternalError(ss.str()); } } - + int32_t send_batch_parallelism = MIN(_send_batch_parallelism, config::max_send_batch_parallelism); + _send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token( + ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism); RETURN_IF_ERROR(Thread::create( "OlapTableSink", "send_batch_process", [this]() { this->_send_batch_process(); }, &_sender_thread)); @@ -1016,27 +1016,11 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* void OlapTableSink::_send_batch_process() { SCOPED_TIMER(_non_blocking_send_timer); - std::unique_ptr thread_pool = nullptr; - int32_t send_batch_parallelism = MIN(_send_batch_parallelism, config::max_send_batch_parallelism); - if (send_batch_parallelism > 1) { - int send_batch_pool_queue_size = 0; - for (auto index_channel : _channels) { - send_batch_pool_queue_size += index_channel->num_node_channels(); - } - auto s = ThreadPoolBuilder("SendBatchThreadPool") - .set_min_threads(send_batch_parallelism) - .set_max_threads(send_batch_parallelism) - .set_max_queue_size(send_batch_pool_queue_size) - .build(&thread_pool); - if (!s.ok()) { - thread_pool.reset(); - } - } do { int running_channels_num = 0; for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&running_channels_num, &thread_pool](NodeChannel* ch) { - running_channels_num += ch->try_send_and_fetch_status(thread_pool); + index_channel->for_each_node_channel([&running_channels_num, this](NodeChannel* ch) { + running_channels_num += ch->try_send_and_fetch_status(this->_send_batch_thread_pool_token); }); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 15785e9515e33a..7613e7c8561918 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -45,6 +45,7 @@ class MemTracker; class RuntimeProfile; class RowDescriptor; class ThreadPool; +class ThreadPoolToken; class Tuple; class TupleDescriptor; class ExprContext; @@ -170,7 +171,7 @@ class NodeChannel { // 1: running, haven't reach eos. // only allow 1 rpc in flight // plz make sure, this func should be called after open_wait(). - int try_send_and_fetch_status(std::unique_ptr& thread_pool); + int try_send_and_fetch_status(std::unique_ptr& thread_pool_token); void try_send_batch(); @@ -379,6 +380,7 @@ class OlapTableSink : public DataSink { CountDownLatch _stop_background_threads_latch; scoped_refptr _sender_thread; + std::unique_ptr _send_batch_thread_pool_token; std::vector _max_decimalv2_val; std::vector _min_decimalv2_val; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f92f3b676f93f3..766ca5c1fed706 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -117,6 +117,7 @@ class ExecEnv { PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; } ThreadPool* limited_scan_thread_pool() { return _limited_scan_thread_pool.get(); } PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; } + ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } ResultCache* result_cache() { return _result_cache; } @@ -189,6 +190,7 @@ class ExecEnv { PriorityThreadPool* _scan_thread_pool = nullptr; std::unique_ptr _limited_scan_thread_pool; + std::unique_ptr _send_batch_thread_pool; PriorityThreadPool* _etl_thread_pool = nullptr; CgroupsMgr* _cgroups_mgr = nullptr; FragmentMgr* _fragment_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index d4a9d2d259d8f8..814779c7b5541a 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -67,6 +67,8 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(etl_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, "", mem_consumption, Labels({{"type", "query"}})); @@ -100,6 +102,12 @@ Status ExecEnv::_init(const std::vector& store_paths) { .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) .build(&_limited_scan_thread_pool); + ThreadPoolBuilder("SendBatchThreadPool") + .set_min_threads(1) + .set_max_threads(config::doris_send_batch_thread_pool_thread_num) + .set_max_queue_size(config::doris_send_batch_thread_pool_queue_size) + .build(&_send_batch_thread_pool); + _etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size, config::etl_thread_pool_queue_size); _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups); @@ -246,11 +254,21 @@ void ExecEnv::_register_metrics() { REGISTER_HOOK_METRIC(etl_thread_pool_queue_size, [this]() { return _etl_thread_pool->get_queue_size(); }); + + REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, [this]() { + return _send_batch_thread_pool->num_threads(); + }); + + REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, [this]() { + return _send_batch_thread_pool->get_queue_size(); + }); } void ExecEnv::_deregister_metrics() { DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size); DEREGISTER_HOOK_METRIC(etl_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num); + DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size); } void ExecEnv::_destroy() { diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index f363c736c9e3a1..67d60a3e675788 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -193,6 +193,8 @@ class DorisMetrics { UIntGauge* scanner_thread_pool_queue_size; UIntGauge* etl_thread_pool_queue_size; UIntGauge* add_batch_task_queue_size; + UIntGauge* send_batch_thread_pool_thread_num; + UIntGauge* send_batch_thread_pool_queue_size; static DorisMetrics* instance() { static DorisMetrics instance; diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index c228e155d795d7..8d2f26ebab702f 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -199,6 +199,11 @@ class ThreadPool { return _num_threads + _num_threads_pending_start; } + int get_queue_size() const { + MutexLock l(&_lock); + return _total_queued_tasks; + } + private: friend class ThreadPoolBuilder; friend class ThreadPoolToken; diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index b4ec5deb85594a..a7541c2da4d57f 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -54,7 +54,11 @@ class OlapTableSinkTest : public testing::Test { _env->_load_stream_mgr = new LoadStreamMgr(); _env->_brpc_stub_cache = new BrpcStubCache(); _env->_buffer_reservation = new ReservationTracker(); - + ThreadPoolBuilder("SendBatchThreadPool") + .set_min_threads(1) + .set_max_threads(5) + .set_max_queue_size(100) + .build(&_env->_send_batch_thread_pool); config::tablet_writer_open_rpc_timeout_sec = 60; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 6e7eb8380f9772..7309ee774041a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -142,8 +142,6 @@ public class SessionVariable implements Serializable, Writable { public static final String DELETE_WITHOUT_PARTITION = "delete_without_partition"; - public static final String EXTRACT_WIDE_RANGE_EXPR = "extract_wide_range_expr"; - // set the default parallelism for send batch when execute InsertStmt operation, // if the value for parallelism exceed `max_send_batch_parallelism` in BE config, // then the coordinator be will use the value of `max_send_batch_parallelism` From ea4b3109382c3461fd4c8a0fc352b23d58b33882 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Thu, 26 Aug 2021 19:51:48 +0800 Subject: [PATCH 06/11] fix --- be/test/exec/tablet_sink_test.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index a7541c2da4d57f..23db269af1cbda 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -60,6 +60,7 @@ class OlapTableSinkTest : public testing::Test { .set_max_queue_size(100) .build(&_env->_send_batch_thread_pool); config::tablet_writer_open_rpc_timeout_sec = 60; + config::max_send_batch_parallelism = 1; } void TearDown() override { From ce01867b48ffa6b14acafe51d5ce93c30fc7cfee Mon Sep 17 00:00:00 2001 From: caiconghui Date: Thu, 26 Aug 2021 20:19:18 +0800 Subject: [PATCH 07/11] add doc --- docs/en/administrator-guide/config/be_config.md | 11 +++++++++++ docs/zh-CN/administrator-guide/config/be_config.md | 12 ++++++++++++ 2 files changed, 23 insertions(+) diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 84f1f4330506ae..436b49f4c738b5 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -483,6 +483,17 @@ The maximum number of data rows returned by each scanning thread in a single exe * Description: The number of threads in the Scanner thread pool. In Doris' scanning tasks, each Scanner will be submitted as a thread task to the thread pool to be scheduled. This parameter determines the size of the Scanner thread pool. * Default value: 48 +### `doris_send_batch_thread_pool_thread_num` + +* Type: int32 +* Description: The number of threads in the SendBatch thread pool. In NodeChannels' sending data tasks, the SendBatch operation of each NodeChannel will be submitted as a thread task to the thread pool to be scheduled. This parameter determines the size of the SendBatch thread pool. +* Default value: 256 + +### `doris_send_batch_thread_pool_queue_size` + +* Type: int32 +* Description: The queue length of the SendBatch thread pool. In NodeChannels' sending data tasks, the SendBatch operation of each NodeChannel will be submitted as a thread task to the thread pool waiting to be scheduled, and after the number of submitted tasks exceeds the length of the thread pool queue, subsequent submitted tasks will be blocked until there is a empty slot in the queue. + ### `download_low_speed_limit_kbps` Default:50 (KB/s) diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index be7febefd462a1..7e9919f2f84a33 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -480,6 +480,18 @@ CumulativeCompaction会跳过最近发布的增量,以防止压缩可能被查 * 描述:Scanner线程池线程数目。在Doris的扫描任务之中,每一个Scanner会作为一个线程task提交到线程池之中等待被调度,该参数决定了Scanner线程池的大小。 * 默认值:48 +### `doris_send_batch_thread_pool_thread_num` + +* 类型:int32 +* 描述:SendBatch线程池线程数目。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,该参数决定了SendBatch线程池的大小。 +* 默认值:256 + +### `doris_send_batch_thread_pool_queue_size` + +* 类型:int32 +* 描述:SendBatch线程池的队列长度。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,而提交的任务数目超过线程池队列的长度之后,后续提交的任务将阻塞直到队列之中有新的空缺。 +* 默认值:102400 + ### `download_low_speed_limit_kbps` 默认值:50 (KB/s) From eb624404b698a65a7008ce0bf0129e099e7a487a Mon Sep 17 00:00:00 2001 From: caiconghui Date: Thu, 26 Aug 2021 23:00:00 +0800 Subject: [PATCH 08/11] fix --- be/src/http/http_common.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 2bed1d5de0243c..4ade62ffa47a30 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -52,6 +52,6 @@ static const std::string HTTP_COMPRESS_TYPE = "compress_type"; static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism"; - static const std::string HTTP_100_CONTINUE = "100-continue"; +static const std::string HTTP_100_CONTINUE = "100-continue"; } // namespace doris From bcc26722c13ab4ebd274459b74b1b9045b83abd6 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Sat, 28 Aug 2021 17:23:58 +0800 Subject: [PATCH 09/11] fix --- be/src/common/config.h | 8 +++---- be/src/exec/tablet_sink.cpp | 2 +- be/src/exec/tablet_sink.h | 6 ++--- be/src/runtime/exec_env_init.cpp | 4 ++-- be/test/exec/tablet_sink_test.cpp | 2 +- .../administrator-guide/config/be_config.md | 22 ++++++++--------- .../administrator-guide/config/be_config.md | 24 +++++++++---------- 7 files changed, 34 insertions(+), 34 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 13408091c73b17..6473df8a9e42af 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -624,13 +624,13 @@ CONF_mBool(runtime_filter_use_async_rpc, "true"); // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism, // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism -CONF_mInt32(max_send_batch_parallelism, "1"); -CONF_Validator(max_send_batch_parallelism, [](const int config) -> bool { return config >= 1; }); +CONF_mInt32(max_send_batch_parallelism_per_job, "5"); +CONF_Validator(max_send_batch_parallelism_per_job, [](const int config) -> bool { return config >= 1; }); // number of send batch thread pool size -CONF_Int32(doris_send_batch_thread_pool_thread_num, "256"); +CONF_Int32(send_batch_thread_pool_thread_num, "256"); // number of send batch thread pool queue size -CONF_Int32(doris_send_batch_thread_pool_queue_size, "102400"); +CONF_Int32(send_batch_thread_pool_queue_size, "102400"); } // namespace config diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 48e51107ad6042..f780971f8ccd4a 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -685,7 +685,7 @@ Status OlapTableSink::open(RuntimeState* state) { return Status::InternalError(ss.str()); } } - int32_t send_batch_parallelism = MIN(_send_batch_parallelism, config::max_send_batch_parallelism); + int32_t send_batch_parallelism = MIN(_send_batch_parallelism, config::max_send_batch_parallelism_per_job); _send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism); RETURN_IF_ERROR(Thread::create( diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 7613e7c8561918..aaaa3bc82393f4 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -418,10 +418,10 @@ class OlapTableSink : public DataSink { int64_t _load_channel_timeout_s = 0; int32_t _send_batch_parallelism = 1; - // True if this sink has been closed once - bool _is_closed = false; + // True if this sink has been closed once bool + bool _is_closed = false; // Save the status of close() method - Status _close_status; + Status _close_status; }; } // namespace stream_load diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 814779c7b5541a..69d70cd5b96f99 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -104,8 +104,8 @@ Status ExecEnv::_init(const std::vector& store_paths) { ThreadPoolBuilder("SendBatchThreadPool") .set_min_threads(1) - .set_max_threads(config::doris_send_batch_thread_pool_thread_num) - .set_max_queue_size(config::doris_send_batch_thread_pool_queue_size) + .set_max_threads(config::send_batch_thread_pool_thread_num) + .set_max_queue_size(config::send_batch_thread_pool_queue_size) .build(&_send_batch_thread_pool); _etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size, diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index 23db269af1cbda..1aa853d11b6006 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -60,7 +60,7 @@ class OlapTableSinkTest : public testing::Test { .set_max_queue_size(100) .build(&_env->_send_batch_thread_pool); config::tablet_writer_open_rpc_timeout_sec = 60; - config::max_send_batch_parallelism = 1; + config::max_send_batch_parallelism_per_job = 1; } void TearDown() override { diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 436b49f4c738b5..430a5c5d9e5a7e 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -483,17 +483,6 @@ The maximum number of data rows returned by each scanning thread in a single exe * Description: The number of threads in the Scanner thread pool. In Doris' scanning tasks, each Scanner will be submitted as a thread task to the thread pool to be scheduled. This parameter determines the size of the Scanner thread pool. * Default value: 48 -### `doris_send_batch_thread_pool_thread_num` - -* Type: int32 -* Description: The number of threads in the SendBatch thread pool. In NodeChannels' sending data tasks, the SendBatch operation of each NodeChannel will be submitted as a thread task to the thread pool to be scheduled. This parameter determines the size of the SendBatch thread pool. -* Default value: 256 - -### `doris_send_batch_thread_pool_queue_size` - -* Type: int32 -* Description: The queue length of the SendBatch thread pool. In NodeChannels' sending data tasks, the SendBatch operation of each NodeChannel will be submitted as a thread task to the thread pool waiting to be scheduled, and after the number of submitted tasks exceeds the length of the thread pool queue, subsequent submitted tasks will be blocked until there is a empty slot in the queue. - ### `download_low_speed_limit_kbps` Default:50 (KB/s) @@ -1073,6 +1062,17 @@ Default:5 This configuration is used for the context gc thread scheduling cycle. Note: The unit is minutes, and the default is 5 minutes +### `send_batch_thread_pool_thread_num` + +* Type: int32 +* Description: The number of threads in the SendBatch thread pool. In NodeChannels' sending data tasks, the SendBatch operation of each NodeChannel will be submitted as a thread task to the thread pool to be scheduled. This parameter determines the size of the SendBatch thread pool. +* Default value: 256 + +### `send_batch_thread_pool_queue_size` + +* Type: int32 +* Description: The queue length of the SendBatch thread pool. In NodeChannels' sending data tasks, the SendBatch operation of each NodeChannel will be submitted as a thread task to the thread pool waiting to be scheduled, and after the number of submitted tasks exceeds the length of the thread pool queue, subsequent submitted tasks will be blocked until there is a empty slot in the queue. + ### `sleep_one_second` + Type: int32 diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 7e9919f2f84a33..f41817ea4dee69 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -480,18 +480,6 @@ CumulativeCompaction会跳过最近发布的增量,以防止压缩可能被查 * 描述:Scanner线程池线程数目。在Doris的扫描任务之中,每一个Scanner会作为一个线程task提交到线程池之中等待被调度,该参数决定了Scanner线程池的大小。 * 默认值:48 -### `doris_send_batch_thread_pool_thread_num` - -* 类型:int32 -* 描述:SendBatch线程池线程数目。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,该参数决定了SendBatch线程池的大小。 -* 默认值:256 - -### `doris_send_batch_thread_pool_queue_size` - -* 类型:int32 -* 描述:SendBatch线程池的队列长度。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,而提交的任务数目超过线程池队列的长度之后,后续提交的任务将阻塞直到队列之中有新的空缺。 -* 默认值:102400 - ### `download_low_speed_limit_kbps` 默认值:50 (KB/s) @@ -1077,6 +1065,18 @@ routine load任务的线程池大小。 这应该大于 FE 配置 'max_concurren 此配置用于上下文gc线程调度周期 , 注意:单位为分钟,默认为 5 分钟 +### `send_batch_thread_pool_thread_num` + +* 类型:int32 +* 描述:SendBatch线程池线程数目。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,该参数决定了SendBatch线程池的大小。 +* 默认值:256 + +### `send_batch_thread_pool_queue_size` + +* 类型:int32 +* 描述:SendBatch线程池的队列长度。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,而提交的任务数目超过线程池队列的长度之后,后续提交的任务将阻塞直到队列之中有新的空缺。 +* 默认值:102400 + ### `serialize_batch` 默认值:false From 69f4e87839fa846cb34908fc834da92e13dbd71c Mon Sep 17 00:00:00 2001 From: caiconghui Date: Sat, 28 Aug 2021 22:02:10 +0800 Subject: [PATCH 10/11] fix --- be/src/common/config.h | 6 +++--- be/src/service/internal_service.cpp | 4 ++-- docs/en/administrator-guide/config/be_config.md | 4 ++-- docs/en/administrator-guide/variables.md | 2 +- .../sql-statements/Data Manipulation/BROKER LOAD.md | 2 +- .../sql-statements/Data Manipulation/ROUTINE LOAD.md | 2 +- .../sql-statements/Data Manipulation/STREAM LOAD.md | 2 +- docs/zh-CN/administrator-guide/config/be_config.md | 4 ++-- docs/zh-CN/administrator-guide/variables.md | 2 +- .../sql-statements/Data Manipulation/BROKER LOAD.md | 2 +- .../sql-statements/Data Manipulation/ROUTINE LOAD.md | 2 +- .../sql-statements/Data Manipulation/STREAM LOAD.md | 2 +- .../org/apache/doris/analysis/CreateRoutineLoadStmt.java | 2 +- .../src/main/java/org/apache/doris/load/loadv2/LoadJob.java | 1 + .../org/apache/doris/load/routineload/RoutineLoadJob.java | 4 ++-- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 4 ++-- gensrc/thrift/DataSinks.thrift | 6 +++--- gensrc/thrift/FrontendService.thrift | 2 +- 18 files changed, 27 insertions(+), 26 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 6473df8a9e42af..eb42572ee58db6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -622,13 +622,13 @@ CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); CONF_mBool(runtime_filter_use_async_rpc, "true"); // max send batch parallelism for OlapTableSink -// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism, -// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism +// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, +// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job CONF_mInt32(max_send_batch_parallelism_per_job, "5"); CONF_Validator(max_send_batch_parallelism_per_job, [](const int config) -> bool { return config >= 1; }); // number of send batch thread pool size -CONF_Int32(send_batch_thread_pool_thread_num, "256"); +CONF_Int32(send_batch_thread_pool_thread_num, "64"); // number of send batch thread pool queue size CONF_Int32(send_batch_thread_pool_queue_size, "102400"); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index adad26bc200307..021324f0377257 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -112,7 +112,7 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr // exhausted, so we put this to a local thread pool to process int64_t submit_task_time_ns = MonotonicNanos(); _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, this]() { - int64_t wait_execution_time_us = MonotonicNanos() - submit_task_time_ns; + int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; { @@ -127,7 +127,7 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr st.to_protobuf(response->mutable_status()); } response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); - response->set_wait_execution_time_us(execution_time_ns / NANOS_PER_MICRO); + response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); } diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 430a5c5d9e5a7e..dd74fb70002ef6 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -789,10 +789,10 @@ Default:100 Max number of txns for every txn_partition_map in txn manager, this is a self protection to avoid too many txns saving in manager -### `max_send_batch_parallelism` +### `max_send_batch_parallelism_per_job` * Type: int -* Description: Max send batch parallelism for OlapTableSink. The value set by the user for `send_batch_parallelism` is not allowed to exceed `max_send_batch_parallelism`, if exceed, the value of `send_batch_parallelism` would be `max_send_batch_parallelism`. +* Description: Max send batch parallelism for OlapTableSink. The value set by the user for `send_batch_parallelism` is not allowed to exceed `max_send_batch_parallelism_per_job`, if exceed, the value of `send_batch_parallelism` would be `max_send_batch_parallelism_per_job`. * Default value: 1 ### `max_tablet_num_per_shard` diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 054a104afea7cf..d5e1bf62b2dd8c 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -347,7 +347,7 @@ Translated with www.DeepL.com/Translator (free version) * `send_batch_parallelism` - Used to set the default parallelism for sending batch when execute InsertStmt operation, if the value for parallelism exceed `max_send_batch_parallelism` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. + Used to set the default parallelism for sending batch when execute InsertStmt operation, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`. * `sql_mode` diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index f51fb1beee4f5d..a689a1ca1c87ab 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -253,7 +253,7 @@ under the License. timezone: Specify time zones for functions affected by time zones, such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation for details. If not specified, use the "Asia/Shanghai" time zone. - send_batch_parallelism: Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. + send_batch_parallelism: Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`. 5. Load data format sample diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index abe3af8e417c3f..edaa2f5fcd589a 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -197,7 +197,7 @@ FROM data_source json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "". 10. `send_batch_parallelism` - Integer, Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. + Integer, Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`. 6. data_source diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index bdc7844adc1250..77db4daa33d09d 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -142,7 +142,7 @@ The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND `read_json_by_line`: Boolean type, true means that one json object can be read per line, and the default value is false. -`send_batch_parallelism`: Integer type, used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. +`send_batch_parallelism`: Integer type, used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. RETURN VALUES diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index f41817ea4dee69..08adfca8f15f54 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -792,10 +792,10 @@ cumulative compaction策略:最大增量文件的数量 txn 管理器中每个 txn_partition_map 的最大 txns 数,这是一种自我保护,以避免在管理器中保存过多的 txns -### `max_send_batch_parallelism` +### `max_send_batch_parallelism_per_job` * 类型:int -* 描述:OlapTableSink 发送批处理数据的最大并行度,用户为 `send_batch_parallelism` 设置的值不允许超过 `max_send_batch_parallelism` ,如果超过, `send_batch_parallelism` 将被设置为 `max_send_batch_parallelism` 的值。 +* 描述:OlapTableSink 发送批处理数据的最大并行度,用户为 `send_batch_parallelism` 设置的值不允许超过 `max_send_batch_parallelism_per_job` ,如果超过, `send_batch_parallelism` 将被设置为 `max_send_batch_parallelism_per_job` 的值。 * 默认值:1 ### `max_tablet_num_per_shard` diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index 84fcadeb160206..a5457cef77024e 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -341,7 +341,7 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3); 暂不使用。 * `send_batch_parallelism` - 用于设置执行 InsertStmt 操作时发送批处理数据的默认并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism` 的值。 + 用于设置执行 InsertStmt 操作时发送批处理数据的默认并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 * `sql_mode` diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index ae8cf1a983cbce..73e2abb3a92f5c 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -246,7 +246,7 @@ under the License. exec_mem_limit: 导入内存限制。默认为 2GB。单位为字节。 strict mode: 是否对数据进行严格限制。默认为 false。 timezone: 指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 [时区] 文档。如果不指定,则使用 "Asia/Shanghai" 时区。 - send_batch_parallelism: 用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism` 的值。 + send_batch_parallelism: 用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 5. 导入数据格式样例 diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index dffe666043e154..798ff6962b94d9 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -183,7 +183,7 @@ under the License. 10. send_batch_parallelism - 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism` 的值。 + 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 6. data_source diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 38aee0c267b2ea..ef0c4c46b8a74f 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -107,7 +107,7 @@ under the License. read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。 - send_batch_parallelism: 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism` 的值。 + send_batch_parallelism: 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 RETURN VALUES 导入完成后,会以Json格式返回这次导入的相关内容。当前包括以下字段 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 67f9beb094a3e1..335ff172304587 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -444,7 +444,7 @@ private void checkJobProperties() throws UserException { RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, EXEC_MEM_LIMIT_PROPERTY + "should > 0"); sendBatchParallelism = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM), - RoutineLoadJob.DEFAULT_SEND_BATCH_PARALLELISM, SEND_BATCH_PARALLELISM_PRED, + ConnectContext.get().getSessionVariable().getSendBatchParallelism(), SEND_BATCH_PARALLELISM_PRED, SEND_BATCH_PARALLELISM + " should > 0")).intValue(); if (ConnectContext.get() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index d41f7b3437ee56..e33e98bf7e3277 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -337,6 +337,7 @@ public void setJobProperties(Map properties) throws DdlException if (ConnectContext.get() != null) { jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, ConnectContext.get().getSessionVariable().getMaxExecMemByte()); jobProperties.put(LoadStmt.TIMEZONE, ConnectContext.get().getSessionVariable().getTimeZone()); + jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, ConnectContext.get().getSessionVariable().getSendBatchParallelism()); } if (properties == null || properties.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 25563d29abd80c..edca51188cbfed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -291,8 +291,8 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { } jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone()); jobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(stmt.isStrictMode())); - jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(stmt.getExecMemLimit())); - jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, String.valueOf(stmt.getSendBatchParallelism())); + jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(this.execMemLimit)); + jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, String.valueOf(this.sendBatchParallelism)); if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) { jobProperties.put(PROPS_FORMAT, "csv"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7309ee774041a0..25d2778940ef4c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -143,8 +143,8 @@ public class SessionVariable implements Serializable, Writable { public static final String DELETE_WITHOUT_PARTITION = "delete_without_partition"; // set the default parallelism for send batch when execute InsertStmt operation, - // if the value for parallelism exceed `max_send_batch_parallelism` in BE config, - // then the coordinator be will use the value of `max_send_batch_parallelism` + // if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, + // then the coordinator be will use the value of `max_send_batch_parallelism_per_job` public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 44ac3e023aa0be..b8fe8b22d364b0 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -76,7 +76,7 @@ struct TDataStreamSink { struct TResultSink { 1: optional TResultSinkType type; - 2: optional TResultFileSinkOptions file_options; + 2: optional TResultFileSinkOptions file_options } struct TMysqlTableSink { @@ -116,7 +116,7 @@ struct TExportSink { 4: required string line_delimiter // properties need to access broker. 5: optional list broker_addresses - 6: optional map properties; + 6: optional map properties } struct TOlapTableSink { @@ -134,7 +134,7 @@ struct TOlapTableSink { 12: required Descriptors.TOlapTableLocationParam location 13: required Descriptors.TPaloNodesInfo nodes_info 14: optional i64 load_channel_timeout_s // the timeout of load channels in second - 15: optional i32 send_batch_parallelism; + 15: optional i32 send_batch_parallelism } struct TDataSink { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index efacdd7c7a08ee..374e282d652b76 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -591,7 +591,7 @@ struct TStreamLoadPutRequest { 32: optional string line_delimiter 33: optional bool read_json_by_line 34: optional string auth_code_uuid - 35: optional i32 send_batch_parallelism; + 35: optional i32 send_batch_parallelism } struct TStreamLoadPutResult { From 4681dc7c66bbc7a3e9d078a27905302b81c06ca3 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Sun, 29 Aug 2021 09:39:17 +0800 Subject: [PATCH 11/11] fix comment --- be/src/exec/tablet_sink.h | 4 ++-- .../sql-statements/Data Manipulation/STREAM LOAD.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index aaaa3bc82393f4..277ecefb86545b 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -418,9 +418,9 @@ class OlapTableSink : public DataSink { int64_t _load_channel_timeout_s = 0; int32_t _send_batch_parallelism = 1; - // True if this sink has been closed once bool + // True if this sink has been closed once bool bool _is_closed = false; - // Save the status of close() method + // Save the status of close() method Status _close_status; }; diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 77db4daa33d09d..9a8e06301ff915 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -142,7 +142,7 @@ The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND `read_json_by_line`: Boolean type, true means that one json object can be read per line, and the default value is false. -`send_batch_parallelism`: Integer type, used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism`. +`send_batch_parallelism`: Integer type, used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`. RETURN VALUES