From 917999c7efb3e07e8e76eba1c15e36f0d64b060a Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Tue, 8 Feb 2022 14:22:13 +0800 Subject: [PATCH 1/3] [Vectorized][Feature] support mysql external table insert into stm --- be/src/exec/data_sink.cpp | 14 ++- be/src/runtime/mysql_table_writer.cpp | 156 +++++++++++++++++++++++- be/src/runtime/mysql_table_writer.h | 13 +- be/src/vec/CMakeLists.txt | 1 + be/src/vec/sink/vmysql_table_writer.cpp | 91 ++++++++++++++ be/src/vec/sink/vmysql_table_writer.h | 76 ++++++++++++ 6 files changed, 344 insertions(+), 7 deletions(-) create mode 100644 be/src/vec/sink/vmysql_table_writer.cpp create mode 100644 be/src/vec/sink/vmysql_table_writer.h diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 2a259482251a45..e6ed5f9524845b 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -38,7 +38,7 @@ #include "vec/sink/result_sink.h" #include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vtablet_sink.h" - +#include "vec/sink/vmysql_table_writer.h" namespace doris { Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, @@ -113,10 +113,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (!thrift_sink.__isset.mysql_table_sink) { return Status::InternalError("Missing data buffer sink."); } - - // TODO: figure out good buffer size based on size of output row - MysqlTableSink* mysql_tbl_sink = new MysqlTableSink(pool, row_desc, output_exprs); - sink->reset(mysql_tbl_sink); + if (is_vec) { + doris::vectorized::VMysqlTableSink* vmysql_tbl_sink = new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs); + sink->reset(vmysql_tbl_sink); + } else { + // TODO: figure out good buffer size based on size of output row + MysqlTableSink* mysql_tbl_sink = new MysqlTableSink(pool, row_desc, output_exprs); + sink->reset(mysql_tbl_sink); + } break; #else return Status::InternalError( diff --git a/be/src/runtime/mysql_table_writer.cpp b/be/src/runtime/mysql_table_writer.cpp index a2ae2af0605ac5..120fff45a70e35 100644 --- a/be/src/runtime/mysql_table_writer.cpp +++ b/be/src/runtime/mysql_table_writer.cpp @@ -26,6 +26,11 @@ #include "runtime/row_batch.h" #include "runtime/tuple_row.h" #include "util/types.h" +#include "vec/columns/column_nullable.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" namespace doris { @@ -38,7 +43,10 @@ std::string MysqlConnInfo::debug_string() const { } MysqlTableWriter::MysqlTableWriter(const std::vector& output_expr_ctxs) - : _output_expr_ctxs(output_expr_ctxs) {} + : _output_expr_ctxs(output_expr_ctxs), _vec_output_expr_ctxs {} {} + +MysqlTableWriter::MysqlTableWriter(const std::vector& output_expr_ctxs) + : _output_expr_ctxs {}, _vec_output_expr_ctxs(output_expr_ctxs) {} MysqlTableWriter::~MysqlTableWriter() { if (_mysql_conn) { @@ -182,4 +190,150 @@ Status MysqlTableWriter::append(RowBatch* batch) { return Status::OK(); } +Status MysqlTableWriter::append(vectorized::Block* block) { + Status status = Status::OK(); + if (block == nullptr || block->rows() == 0) { + return status; + } + + auto output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( + _vec_output_expr_ctxs, *block, status); + + auto num_rows = output_block.rows(); + if (UNLIKELY(num_rows == 0)) { + return status; + } + + for (int i = 0; i < num_rows; ++i) { + RETURN_IF_ERROR(insert_row(output_block, i)); + } + return Status::OK(); +} + +Status MysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { + _insert_stmt_buffer.clear(); + fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _mysql_tbl); + int num_columns = _vec_output_expr_ctxs.size(); + + for (int i = 0; i < num_columns; ++i) { + auto column_ptr = block.get_by_position(i).column->convert_to_full_column_if_const(); + auto type_ptr = block.get_by_position(i).type; + + if (i != 0) { + fmt::format_to(_insert_stmt_buffer, "{}", ", "); + } + + vectorized::ColumnPtr column; + if (type_ptr->is_nullable()) { + column = assert_cast(*column_ptr) + .get_nested_column_ptr(); + if (column_ptr->is_null_at(row)) { + fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); + continue; + } + } else { + column = column_ptr; + } + + switch (_vec_output_expr_ctxs[i]->root()->result_type()) { + case TYPE_BOOLEAN: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + break; + } + case TYPE_TINYINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + break; + } + case TYPE_SMALLINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + break; + } + case TYPE_INT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + break; + } + case TYPE_BIGINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + break; + } + case TYPE_FLOAT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + break; + } + case TYPE_DOUBLE: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + break; + } + + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: { + const auto& string_val = + assert_cast(*column).get_data_at(row); + if (string_val.data == nullptr) { + if (string_val.size == 0) { + fmt::format_to(_insert_stmt_buffer, "{}", "''"); + } else { + fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); + } + } else { + char* buf = new char[2 * string_val.size + 1]; + mysql_real_escape_string(_mysql_conn, buf, string_val.data, string_val.size); + fmt::format_to(_insert_stmt_buffer, "'{}'", buf); + delete[] buf; + } + break; + } + case TYPE_DECIMALV2: { + DecimalV2Value value = + (DecimalV2Value) + assert_cast&>( + *column) + .get_data()[row]; + fmt::format_to(_insert_stmt_buffer, "{}", value.to_string()); + break; + } + case TYPE_DATE: + case TYPE_DATETIME: { + int64_t int_val = assert_cast(*column).get_data()[row]; + vectorized::VecDateTimeValue value = + binary_cast(int_val); + + char buf[64]; + char* pos = value.to_string(buf); + std::string str(buf, pos - buf - 1); + fmt::format_to(_insert_stmt_buffer, "'{}'", str); + break; + } + default: { + fmt::memory_buffer err_out; + fmt::format_to(err_out, "can't convert this type to mysql type. type = {}", + _vec_output_expr_ctxs[i]->root()->type().type); + return Status::InternalError(err_out.data()); + } + } + } + + fmt::format_to(_insert_stmt_buffer, "{}", ")"); + + // Insert this to MySQL server + std::string insert_stmt = to_string(_insert_stmt_buffer); + LOG(INFO) << insert_stmt; + if (mysql_real_query(_mysql_conn, insert_stmt.c_str(), insert_stmt.length())) { + fmt::memory_buffer err_ss; + fmt::format_to(err_ss, "Insert to mysql server({}) failed, because: {}.", + mysql_get_host_info(_mysql_conn), mysql_error(_mysql_conn)); + return Status::InternalError(err_ss.data()); + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/mysql_table_writer.h b/be/src/runtime/mysql_table_writer.h index 8a1d04595d7233..42de77bfb9301a 100644 --- a/be/src/runtime/mysql_table_writer.h +++ b/be/src/runtime/mysql_table_writer.h @@ -22,7 +22,7 @@ #include #include "common/status.h" - +#include #ifndef __DorisMysql #define __DorisMysql void #endif @@ -42,10 +42,15 @@ struct MysqlConnInfo { class RowBatch; class TupleRow; class ExprContext; +namespace vectorized { +class VExprContext; +class Block; +} class MysqlTableWriter { public: MysqlTableWriter(const std::vector& output_exprs); + MysqlTableWriter(const std::vector& output_exprs); ~MysqlTableWriter(); // connect to mysql server @@ -55,6 +60,8 @@ class MysqlTableWriter { Status append(RowBatch* batch); + Status append(vectorized::Block* block); + Status abort_tarns() { return Status::OK(); } Status finish_tarns() { return Status::OK(); } @@ -63,6 +70,10 @@ class MysqlTableWriter { Status insert_row(TupleRow* row); const std::vector& _output_expr_ctxs; + //vectorized mode insert_row + Status insert_row(vectorized::Block& block, size_t row); + const std::vector& _vec_output_expr_ctxs; + fmt::memory_buffer _insert_stmt_buffer; std::string _mysql_tbl; __DorisMysql* _mysql_conn; }; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 6201c67a4425b2..84fc57173352d7 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -149,6 +149,7 @@ set(VEC_FILES sink/result_sink.cpp sink/vdata_stream_sender.cpp sink/vtablet_sink.cpp + sink/vmysql_table_writer.cpp runtime/vdatetime_value.cpp runtime/vdata_stream_recvr.cpp runtime/vdata_stream_mgr.cpp diff --git a/be/src/vec/sink/vmysql_table_writer.cpp b/be/src/vec/sink/vmysql_table_writer.cpp new file mode 100644 index 00000000000000..ef7f9c0168cdd6 --- /dev/null +++ b/be/src/vec/sink/vmysql_table_writer.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "exprs/expr.h" +#include "runtime/mem_tracker.h" +#include "runtime/mysql_table_sink.h" +#include "runtime/runtime_state.h" +#include "util/debug_util.h" +#include "util/runtime_profile.h" +#include "vec/sink/vmysql_table_writer.h" +#include "vec/exprs/vexpr.h" + +namespace doris { +namespace vectorized { +VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& t_exprs) + : _pool(pool), + _row_desc(row_desc), + _t_output_expr(t_exprs), + _mem_tracker(MemTracker::CreateTracker(-1, "VMysqlTableSink")) { + _name = "VMysqlTableSink"; +} + +VMysqlTableSink::~VMysqlTableSink() {} + +Status VMysqlTableSink::init(const TDataSink& t_sink) { + RETURN_IF_ERROR(DataSink::init(t_sink)); + const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink; + + _conn_info.host = t_mysql_sink.host; + _conn_info.port = t_mysql_sink.port; + _conn_info.user = t_mysql_sink.user; + _conn_info.passwd = t_mysql_sink.passwd; + _conn_info.db = t_mysql_sink.db; + _mysql_tbl = t_mysql_sink.table; + + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs)); + return Status::OK(); +} + +Status VMysqlTableSink::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSink::prepare(state)); + // Prepare the exprs to run. + RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker)); + std::stringstream title; + title << "VMysqlTableSink (frag_id=" << state->fragment_instance_id() << ")"; + // create profile + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); + return Status::OK(); +} + +Status VMysqlTableSink::open(RuntimeState* state) { + // Prepare the exprs to run. + RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state)); + // create writer + _writer = state->obj_pool()->add(new doris::MysqlTableWriter(_output_expr_ctxs)); + RETURN_IF_ERROR(_writer->open(_conn_info, _mysql_tbl)); + return Status::OK(); +} + +Status VMysqlTableSink::send(RuntimeState* state, RowBatch* batch) { + return Status::NotSupported("Not Implemented VMysqlTableSink::send(RuntimeState* state, RowBatch* batch)"); +} + +Status VMysqlTableSink::send(RuntimeState* state, Block* block) { + return _writer->append(block); +} + +Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) { + VExpr::close(_output_expr_ctxs, state); + return Status::OK(); +} +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/vmysql_table_writer.h b/be/src/vec/sink/vmysql_table_writer.h new file mode 100644 index 00000000000000..694f97fb6138c5 --- /dev/null +++ b/be/src/vec/sink/vmysql_table_writer.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include + +#include "common/status.h" +#include "exec/data_sink.h" +#include "runtime/mysql_table_writer.h" + +namespace doris { + +class RowDescriptor; +class TExpr; +class TMysqlTableSink; +class RuntimeState; +class RuntimeProfile; +class ExprContext; +class MemTracker; +namespace vectorized { + +class VExprContext; +class VExpr; +// This class is a sinker, which put input data to mysql table +class VMysqlTableSink : public DataSink { +public: + VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& t_exprs); + + ~VMysqlTableSink(); + + Status init(const TDataSink& thrift_sink) override; + + Status prepare(RuntimeState* state) override; + + Status open(RuntimeState* state) override; + + Status send(RuntimeState* state, RowBatch* batch) override; + + Status send(RuntimeState* state, vectorized::Block* block) override; + // Flush all buffered data and close all existing channels to destination + // hosts. Further send() calls are illegal after calling close(). + Status close(RuntimeState* state, Status exec_status) override; + + RuntimeProfile* profile() override { return _profile; } + +private: + // owned by RuntimeState + ObjectPool* _pool; + const RowDescriptor& _row_desc; + const std::vector& _t_output_expr; + + std::vector _output_expr_ctxs; + MysqlConnInfo _conn_info; + std::string _mysql_tbl; + MysqlTableWriter* _writer; + + RuntimeProfile* _profile; + std::shared_ptr _mem_tracker; +}; +} // namespace vectorized +} // namespace doris + From 4fa9fca65ccf47d4e84abbf08598c10794662bd7 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Fri, 11 Feb 2022 15:29:48 +0800 Subject: [PATCH 2/3] some change according to review --- be/src/exec/data_sink.cpp | 3 +- be/src/runtime/mysql_table_writer.cpp | 42 +++++++++++---------------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index e6ed5f9524845b..286ff7fe772c24 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -37,8 +37,9 @@ #include "vec/sink/result_sink.h" #include "vec/sink/vdata_stream_sender.h" -#include "vec/sink/vtablet_sink.h" #include "vec/sink/vmysql_table_writer.h" +#include "vec/sink/vtablet_sink.h" + namespace doris { Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, diff --git a/be/src/runtime/mysql_table_writer.cpp b/be/src/runtime/mysql_table_writer.cpp index 120fff45a70e35..90ec03fed646e6 100644 --- a/be/src/runtime/mysql_table_writer.cpp +++ b/be/src/runtime/mysql_table_writer.cpp @@ -31,6 +31,7 @@ #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/core/materialize_block.h" namespace doris { @@ -203,7 +204,7 @@ Status MysqlTableWriter::append(vectorized::Block* block) { if (UNLIKELY(num_rows == 0)) { return status; } - + materialize_block_inplace(output_block); for (int i = 0; i < num_rows; ++i) { RETURN_IF_ERROR(insert_row(output_block, i)); } @@ -216,8 +217,8 @@ Status MysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { int num_columns = _vec_output_expr_ctxs.size(); for (int i = 0; i < num_columns; ++i) { - auto column_ptr = block.get_by_position(i).column->convert_to_full_column_if_const(); - auto type_ptr = block.get_by_position(i).type; + auto& column_ptr = block.get_by_position(i).column; + auto& type_ptr = block.get_by_position(i).type; if (i != 0) { fmt::format_to(_insert_stmt_buffer, "{}", ", "); @@ -238,37 +239,37 @@ Status MysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { switch (_vec_output_expr_ctxs[i]->root()->result_type()) { case TYPE_BOOLEAN: { auto& data = assert_cast(*column).get_data(); - fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); break; } case TYPE_TINYINT: { auto& data = assert_cast(*column).get_data(); - fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); break; } case TYPE_SMALLINT: { auto& data = assert_cast(*column).get_data(); - fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); break; } case TYPE_INT: { auto& data = assert_cast(*column).get_data(); - fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); break; } case TYPE_BIGINT: { auto& data = assert_cast(*column).get_data(); - fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); break; } case TYPE_FLOAT: { auto& data = assert_cast(*column).get_data(); - fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); break; } case TYPE_DOUBLE: { auto& data = assert_cast(*column).get_data(); - fmt::format_to(_insert_stmt_buffer, "{}", std::to_string(data[row])); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); break; } @@ -277,18 +278,11 @@ Status MysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { case TYPE_VARCHAR: { const auto& string_val = assert_cast(*column).get_data_at(row); - if (string_val.data == nullptr) { - if (string_val.size == 0) { - fmt::format_to(_insert_stmt_buffer, "{}", "''"); - } else { - fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); - } - } else { - char* buf = new char[2 * string_val.size + 1]; - mysql_real_escape_string(_mysql_conn, buf, string_val.data, string_val.size); - fmt::format_to(_insert_stmt_buffer, "'{}'", buf); - delete[] buf; - } + DCHECK(string_val.data != nullptr); + std::unique_ptr buf(new char[2 * string_val.size + 1]); + mysql_real_escape_string(_mysql_conn, buf.get(), string_val.data, string_val.size); + fmt::format_to(_insert_stmt_buffer, "'{}'", buf.get()); + break; } case TYPE_DECIMALV2: { @@ -324,9 +318,7 @@ Status MysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { fmt::format_to(_insert_stmt_buffer, "{}", ")"); // Insert this to MySQL server - std::string insert_stmt = to_string(_insert_stmt_buffer); - LOG(INFO) << insert_stmt; - if (mysql_real_query(_mysql_conn, insert_stmt.c_str(), insert_stmt.length())) { + if (mysql_real_query(_mysql_conn, _insert_stmt_buffer.data(), _insert_stmt_buffer.size())) { fmt::memory_buffer err_ss; fmt::format_to(err_ss, "Insert to mysql server({}) failed, because: {}.", mysql_get_host_info(_mysql_conn), mysql_error(_mysql_conn)); From fcb2676e585f858be730ca66a0000091dbcaee5d Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Mon, 14 Feb 2022 11:45:59 +0800 Subject: [PATCH 3/3] change head file position --- be/src/runtime/mysql_table_writer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/mysql_table_writer.h b/be/src/runtime/mysql_table_writer.h index 42de77bfb9301a..770910cc45daf5 100644 --- a/be/src/runtime/mysql_table_writer.h +++ b/be/src/runtime/mysql_table_writer.h @@ -18,11 +18,11 @@ #ifndef DORIS_BE_RUNTIME_MYSQL_TABLE_WRITER_H #define DORIS_BE_RUNTIME_MYSQL_TABLE_WRITER_H +#include #include #include #include "common/status.h" -#include #ifndef __DorisMysql #define __DorisMysql void #endif