Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <algorithm>
#include <chrono> // IWYU pragma: keep
#include <ostream>
#include <string>
Expand Down Expand Up @@ -130,16 +131,21 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
}

if (_waiting_rpc.empty()) {
size_t bytes = 0;
std::for_each(result->result_batch.rows.cbegin(), result->result_batch.rows.cend(),
[&bytes](const std::string& row) { bytes += row.size(); });
// Merge result into batch to reduce rpc times
if (!_batch_queue.empty() &&
((_batch_queue.back()->result_batch.rows.size() + num_rows) < _buffer_limit) &&
!result->eos) {
!result->eos && (bytes + _last_batch_bytes) <= config::thrift_max_message_size) {
std::vector<std::string>& back_rows = _batch_queue.back()->result_batch.rows;
std::vector<std::string>& result_rows = result->result_batch.rows;
back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
std::make_move_iterator(result_rows.end()));
_last_batch_bytes += bytes;
} else {
_batch_queue.push_back(std::move(result));
_last_batch_bytes = bytes;
}
_buffer_rows += num_rows;
_data_arrival.notify_one();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class BufferControlBlock {
std::atomic_int _buffer_rows;
const int _buffer_limit;
int64_t _packet_num;
size_t _last_batch_bytes = 0;

// blocking queue for batch
ResultQueue _batch_queue;
Expand Down
32 changes: 30 additions & 2 deletions be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,8 @@ int VMysqlResultWriter<is_binary_format>::_add_one_cell(const ColumnPtr& column_
template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
return status;
return Status::OK();
}

DCHECK(_output_vexpr_ctxs.empty() != true);
Expand All @@ -613,7 +612,36 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));

const auto total_bytes = block.bytes();
if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
const auto total_rows = block.rows();
const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) /
config::thrift_max_message_size;
const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count;

size_t offset = 0;
while (offset < total_rows) {
size_t rows = std::min(static_cast<size_t>(sub_block_rows), total_rows - offset);
auto sub_block = block.clone_empty();
for (size_t i = 0; i != block.columns(); ++i) {
sub_block.get_by_position(i).column =
block.get_by_position(i).column->cut(offset, rows);
}
offset += rows;

RETURN_IF_ERROR(_append_block(sub_block));
}
return Status::OK();
}

return _append_block(block);
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::_append_block(Block& block) {
// convert one batch
Status status = Status::OK();
auto result = std::make_unique<TFetchDataResult>();
auto num_rows = block.rows();
result->result_batch.rows.resize(num_rows);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/vmysql_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class VMysqlResultWriter final : public ResultWriter {
int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const DataTypePtr& type,
MysqlRowBuffer<is_binary_format>& buffer, int scale = -1);

Status _append_block(Block& block);

BufferControlBlock* _sinker;

const VExprContextSPtrs& _output_vexpr_ctxs;
Expand Down
Loading