diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 62c1763861a59a..da4d8ef4fe69b4 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -23,6 +23,7 @@ #include #include // IWYU pragma: no_include +#include #include // IWYU pragma: keep #include #include @@ -130,16 +131,21 @@ Status BufferControlBlock::add_batch(std::unique_ptr& result) } if (_waiting_rpc.empty()) { + size_t bytes = 0; + std::for_each(result->result_batch.rows.cbegin(), result->result_batch.rows.cend(), + [&bytes](const std::string& row) { bytes += row.size(); }); // Merge result into batch to reduce rpc times if (!_batch_queue.empty() && ((_batch_queue.back()->result_batch.rows.size() + num_rows) < _buffer_limit) && - !result->eos) { + !result->eos && (bytes + _last_batch_bytes) <= config::thrift_max_message_size) { std::vector& back_rows = _batch_queue.back()->result_batch.rows; std::vector& result_rows = result->result_batch.rows; back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), std::make_move_iterator(result_rows.end())); + _last_batch_bytes += bytes; } else { _batch_queue.push_back(std::move(result)); + _last_batch_bytes = bytes; } _buffer_rows += num_rows; _data_arrival.notify_one(); diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 0bb38c54e00e73..4087108c1f0943 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -103,6 +103,7 @@ class BufferControlBlock { std::atomic_int _buffer_rows; const int _buffer_limit; int64_t _packet_num; + size_t _last_batch_bytes = 0; // blocking queue for batch ResultQueue _batch_queue; diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 622e3811164644..4ce8a6f827bc4d 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -601,9 +601,8 @@ int VMysqlResultWriter::_add_one_cell(const ColumnPtr& column_ template Status VMysqlResultWriter::append_block(Block& input_block) { SCOPED_TIMER(_append_row_batch_timer); - Status status = Status::OK(); if (UNLIKELY(input_block.rows() == 0)) { - return status; + return Status::OK(); } DCHECK(_output_vexpr_ctxs.empty() != true); @@ -613,7 +612,36 @@ Status VMysqlResultWriter::append_block(Block& input_block) { Block block; RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); + + const auto total_bytes = block.bytes(); + if (total_bytes > config::thrift_max_message_size) [[unlikely]] { + const auto total_rows = block.rows(); + const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) / + config::thrift_max_message_size; + const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count; + + size_t offset = 0; + while (offset < total_rows) { + size_t rows = std::min(static_cast(sub_block_rows), total_rows - offset); + auto sub_block = block.clone_empty(); + for (size_t i = 0; i != block.columns(); ++i) { + sub_block.get_by_position(i).column = + block.get_by_position(i).column->cut(offset, rows); + } + offset += rows; + + RETURN_IF_ERROR(_append_block(sub_block)); + } + return Status::OK(); + } + + return _append_block(block); +} + +template +Status VMysqlResultWriter::_append_block(Block& block) { // convert one batch + Status status = Status::OK(); auto result = std::make_unique(); auto num_rows = block.rows(); result->result_batch.rows.resize(num_rows); diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h index 1761a1a3bef9ad..77f6f8e5e670cd 100644 --- a/be/src/vec/sink/vmysql_result_writer.h +++ b/be/src/vec/sink/vmysql_result_writer.h @@ -66,6 +66,8 @@ class VMysqlResultWriter final : public ResultWriter { int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const DataTypePtr& type, MysqlRowBuffer& buffer, int scale = -1); + Status _append_block(Block& block); + BufferControlBlock* _sinker; const VExprContextSPtrs& _output_vexpr_ctxs;