From e639351667affbd4cbbd39395634d655c0f69acb Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 6 Dec 2023 11:40:37 +0800 Subject: [PATCH 1/7] fix ser --- .../serde/data_type_array_serde.cpp | 18 ++++++++--- be/src/vec/sink/vmysql_result_writer.cpp | 31 +++++++++++++------ 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp index 91dfa8452e2778..d6d85bd31e9b4a 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -299,18 +299,28 @@ void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::Ar template Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column, MysqlRowBuffer& result, - int row_idx, bool col_const) const { + int row_idx_of_mysql, bool col_const) const { auto& column_array = assert_cast(column); auto& offsets = column_array.get_offsets(); auto& data = column_array.get_data(); bool is_nested_string = data.is_column_string(); - const auto col_index = index_check_const(row_idx, col_const); + const auto row_idx_of_col_arr = index_check_const(row_idx_of_mysql, col_const); + + if (column_array.size() <= row_idx_of_col_arr) { + return Status::InternalError( + "Logical error, trying to fetch {}-th element of column array, whose size is {}", + row_idx_of_col_arr, column_array.size()); + } + result.open_dynamic_mode(); if (0 != result.push_string("[", 1)) { return Status::InternalError("pack mysql buffer failed."); } - for (int j = offsets[col_index - 1]; j < offsets[col_index]; ++j) { - if (j != offsets[col_index - 1]) { + + const auto begin_arr_element = offsets[row_idx_of_col_arr - 1]; + const auto end_arr_element = offsets[row_idx_of_col_arr]; + for (int j = begin_arr_element; j < end_arr_element; ++j) { + if (j != begin_arr_element) { if (0 != result.push_string(", ", 2)) { return Status::InternalError("pack mysql buffer failed."); } diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 867cd7dc1d0468..6c6edb1bd99308 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -139,14 +139,16 @@ Status VMysqlResultWriter::append_block(Block& input_block) { }; std::vector arguments; - for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) { - const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column); - int scale = _output_vexpr_ctxs[i]->root()->type().scale; + const size_t num_cols = _output_vexpr_ctxs.size(); + for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { + const auto& [column_ptr, col_const] = + unpack_if_const(block.get_by_position(col_idx).column); + int scale = _output_vexpr_ctxs[col_idx]->root()->type().scale; // decimalv2 scale and precision is hard code, so we should get real scale and precision // from expr DataTypeSerDeSPtr serde; - if (_output_vexpr_ctxs[i]->root()->type().is_decimal_v2_type()) { - if (_output_vexpr_ctxs[i]->root()->is_nullable()) { + if (_output_vexpr_ctxs[col_idx]->root()->type().is_decimal_v2_type()) { + if (_output_vexpr_ctxs[col_idx]->root()->is_nullable()) { auto nested_serde = std::make_shared>(scale, 27); @@ -156,16 +158,25 @@ Status VMysqlResultWriter::append_block(Block& input_block) { 27); } } else { - serde = block.get_by_position(i).type->get_serde(); + serde = block.get_by_position(col_idx).type->get_serde(); } serde->set_return_object_as_string(output_object_data()); arguments.emplace_back(column_ptr.get(), col_const, serde); } - for (size_t row_idx = 0; row_idx != num_rows; ++row_idx) { - for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) { - RETURN_IF_ERROR(arguments[i].serde->write_column_to_mysql( - *(arguments[i].column), row_buffer, row_idx, arguments[i].is_const)); + for (size_t row_idx = 0; row_idx < num_rows; ++row_idx) { + for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { + const auto& argument = arguments[col_idx]; + if (argument.column->size() < num_rows) { + return Status::InternalError( + "Required row size is out of range, need {} rows, column {} has {} " + "rows in " + "fact.", + num_rows, argument.column->get_name(), argument.column->size()); + } + RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql( + *(arguments[col_idx].column), row_buffer, row_idx, + arguments[col_idx].is_const)); } // copy MysqlRowBuffer to Thrift From e0d80ad01f39fb2e6ec8b30b16105583d439d71a Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 6 Dec 2023 17:32:45 +0800 Subject: [PATCH 2/7] Need --- be/src/vec/sink/vmysql_result_writer.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 6c6edb1bd99308..e3cb54db5671ff 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -112,17 +112,20 @@ Status VMysqlResultWriter::append_block(Block& input_block) { return status; } + if (UNLIKELY(input_block.columns() != _output_vexpr_ctxs.size())) { + return Status::InternalError("Requiring {} output columns, but just have {} real columns", + _output_vexpr_ctxs.size(), input_block.columns()); + } + // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec // failed, just return the error status Block block; RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); - // convert one batch auto result = std::make_unique(); auto num_rows = block.rows(); result->result_batch.rows.resize(num_rows); - uint64_t bytes_sent = 0; { SCOPED_TIMER(_convert_tuple_timer); @@ -170,8 +173,7 @@ Status VMysqlResultWriter::append_block(Block& input_block) { if (argument.column->size() < num_rows) { return Status::InternalError( "Required row size is out of range, need {} rows, column {} has {} " - "rows in " - "fact.", + "rows in fact.", num_rows, argument.column->get_name(), argument.column->size()); } RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql( From 7674c8ebbd9b5d31f2bf6d37ec75ce3b8dd19bbf Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 6 Dec 2023 20:01:06 +0800 Subject: [PATCH 3/7] cr modify --- be/src/vec/sink/vmysql_result_writer.cpp | 26 ++++++++++++++---------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index e3cb54db5671ff..fb8f6f78fcf6f3 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -112,16 +112,17 @@ Status VMysqlResultWriter::append_block(Block& input_block) { return status; } - if (UNLIKELY(input_block.columns() != _output_vexpr_ctxs.size())) { - return Status::InternalError("Requiring {} output columns, but just have {} real columns", - _output_vexpr_ctxs.size(), input_block.columns()); - } - // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec // failed, just return the error status Block block; RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); + + if (UNLIKELY(block.columns() != _output_vexpr_ctxs.size())) { + return Status::InternalError("Requiring {} output columns, but just have {} real columns", + _output_vexpr_ctxs.size(), input_block.columns()); + } + // convert one batch auto result = std::make_unique(); auto num_rows = block.rows(); @@ -167,15 +168,18 @@ Status VMysqlResultWriter::append_block(Block& input_block) { arguments.emplace_back(column_ptr.get(), col_const, serde); } - for (size_t row_idx = 0; row_idx < num_rows; ++row_idx) { - for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { - const auto& argument = arguments[col_idx]; - if (argument.column->size() < num_rows) { - return Status::InternalError( + for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { + const auto& argument = arguments[col_idx]; + if (argument.column->size() < num_rows) { + return Status::InternalError( "Required row size is out of range, need {} rows, column {} has {} " "rows in fact.", num_rows, argument.column->get_name(), argument.column->size()); - } + } + } + + for (size_t row_idx = 0; row_idx < num_rows; ++row_idx) { + for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql( *(arguments[col_idx].column), row_buffer, row_idx, arguments[col_idx].is_const)); From b40261988f13e502e367e3f9d070e5d3d19730f5 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 6 Dec 2023 20:07:10 +0800 Subject: [PATCH 4/7] format --- be/src/vec/sink/vmysql_result_writer.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index fb8f6f78fcf6f3..45854bb26b3995 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -172,9 +172,9 @@ Status VMysqlResultWriter::append_block(Block& input_block) { const auto& argument = arguments[col_idx]; if (argument.column->size() < num_rows) { return Status::InternalError( - "Required row size is out of range, need {} rows, column {} has {} " - "rows in fact.", - num_rows, argument.column->get_name(), argument.column->size()); + "Required row size is out of range, need {} rows, column {} has {} " + "rows in fact.", + num_rows, argument.column->get_name(), argument.column->size()); } } From d7eb6c7c39da8544fbf76f7f818a4a15c51a4b6c Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 6 Dec 2023 23:02:01 +0800 Subject: [PATCH 5/7] Need --- be/src/vec/sink/vmysql_result_writer.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 45854bb26b3995..6d359f9ce9ea50 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -112,17 +112,14 @@ Status VMysqlResultWriter::append_block(Block& input_block) { return status; } + DCHECK(_output_vexpr_ctxs.empty() != true); + // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec // failed, just return the error status Block block; RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); - if (UNLIKELY(block.columns() != _output_vexpr_ctxs.size())) { - return Status::InternalError("Requiring {} output columns, but just have {} real columns", - _output_vexpr_ctxs.size(), input_block.columns()); - } - // convert one batch auto result = std::make_unique(); auto num_rows = block.rows(); @@ -142,8 +139,10 @@ Status VMysqlResultWriter::append_block(Block& input_block) { DataTypeSerDeSPtr serde; }; - std::vector arguments; const size_t num_cols = _output_vexpr_ctxs.size(); + std::vector arguments; + arguments.reserve(num_cols); + for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(col_idx).column); From 9bb3606dca716cc27abdb00192b7d71abb25b359 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Thu, 7 Dec 2023 11:09:30 +0800 Subject: [PATCH 6/7] Fix column const --- be/src/vec/sink/vmysql_result_writer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 6d359f9ce9ea50..336f627fcc3a5f 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -119,7 +119,6 @@ Status VMysqlResultWriter::append_block(Block& input_block) { Block block; RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); - // convert one batch auto result = std::make_unique(); auto num_rows = block.rows(); @@ -169,7 +168,8 @@ Status VMysqlResultWriter::append_block(Block& input_block) { for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { const auto& argument = arguments[col_idx]; - if (argument.column->size() < num_rows) { + // const column will only have 1 row, see unpack_if_const + if (argument.column->size() < num_rows && !argument.is_const) { return Status::InternalError( "Required row size is out of range, need {} rows, column {} has {} " "rows in fact.", From 6faf066455b5f69b2e00b427e3dd40f38270e048 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Thu, 7 Dec 2023 12:22:59 +0800 Subject: [PATCH 7/7] rm array check per row --- be/src/vec/data_types/serde/data_type_array_serde.cpp | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp index d6d85bd31e9b4a..7aa5ab78cc7bea 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -305,14 +305,8 @@ Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column, auto& data = column_array.get_data(); bool is_nested_string = data.is_column_string(); const auto row_idx_of_col_arr = index_check_const(row_idx_of_mysql, col_const); - - if (column_array.size() <= row_idx_of_col_arr) { - return Status::InternalError( - "Logical error, trying to fetch {}-th element of column array, whose size is {}", - row_idx_of_col_arr, column_array.size()); - } - result.open_dynamic_mode(); + if (0 != result.push_string("[", 1)) { return Status::InternalError("pack mysql buffer failed."); }