diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 2a259482251a45..286ff7fe772c24 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -37,6 +37,7 @@ #include "vec/sink/result_sink.h" #include "vec/sink/vdata_stream_sender.h" +#include "vec/sink/vmysql_table_writer.h" #include "vec/sink/vtablet_sink.h" namespace doris { @@ -113,10 +114,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (!thrift_sink.__isset.mysql_table_sink) { return Status::InternalError("Missing data buffer sink."); } - - // TODO: figure out good buffer size based on size of output row - MysqlTableSink* mysql_tbl_sink = new MysqlTableSink(pool, row_desc, output_exprs); - sink->reset(mysql_tbl_sink); + if (is_vec) { + doris::vectorized::VMysqlTableSink* vmysql_tbl_sink = new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs); + sink->reset(vmysql_tbl_sink); + } else { + // TODO: figure out good buffer size based on size of output row + MysqlTableSink* mysql_tbl_sink = new MysqlTableSink(pool, row_desc, output_exprs); + sink->reset(mysql_tbl_sink); + } break; #else return Status::InternalError( diff --git a/be/src/runtime/mysql_table_writer.cpp b/be/src/runtime/mysql_table_writer.cpp index a2ae2af0605ac5..90ec03fed646e6 100644 --- a/be/src/runtime/mysql_table_writer.cpp +++ b/be/src/runtime/mysql_table_writer.cpp @@ -26,6 +26,12 @@ #include "runtime/row_batch.h" #include "runtime/tuple_row.h" #include "util/types.h" +#include "vec/columns/column_nullable.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/core/materialize_block.h" namespace doris { @@ -38,7 +44,10 @@ std::string MysqlConnInfo::debug_string() const { } MysqlTableWriter::MysqlTableWriter(const std::vector& output_expr_ctxs) - : _output_expr_ctxs(output_expr_ctxs) {} + : _output_expr_ctxs(output_expr_ctxs), _vec_output_expr_ctxs {} {} + +MysqlTableWriter::MysqlTableWriter(const std::vector& output_expr_ctxs) + : _output_expr_ctxs {}, _vec_output_expr_ctxs(output_expr_ctxs) {} MysqlTableWriter::~MysqlTableWriter() { if (_mysql_conn) { @@ -182,4 +191,141 @@ Status MysqlTableWriter::append(RowBatch* batch) { return Status::OK(); } +Status MysqlTableWriter::append(vectorized::Block* block) { + Status status = Status::OK(); + if (block == nullptr || block->rows() == 0) { + return status; + } + + auto output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( + _vec_output_expr_ctxs, *block, status); + + auto num_rows = output_block.rows(); + if (UNLIKELY(num_rows == 0)) { + return status; + } + materialize_block_inplace(output_block); + for (int i = 0; i < num_rows; ++i) { + RETURN_IF_ERROR(insert_row(output_block, i)); + } + return Status::OK(); +} + +Status MysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { + _insert_stmt_buffer.clear(); + fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _mysql_tbl); + int num_columns = _vec_output_expr_ctxs.size(); + + for (int i = 0; i < num_columns; ++i) { + auto& column_ptr = block.get_by_position(i).column; + auto& type_ptr = block.get_by_position(i).type; + + if (i != 0) { + fmt::format_to(_insert_stmt_buffer, "{}", ", "); + } + + vectorized::ColumnPtr column; + if (type_ptr->is_nullable()) { + column = assert_cast(*column_ptr) + .get_nested_column_ptr(); + if (column_ptr->is_null_at(row)) { + fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); + continue; + } + } else { + column = column_ptr; + } + + switch (_vec_output_expr_ctxs[i]->root()->result_type()) { + case TYPE_BOOLEAN: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_TINYINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_SMALLINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_INT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_BIGINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_FLOAT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_DOUBLE: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: { + const auto& string_val = + assert_cast(*column).get_data_at(row); + DCHECK(string_val.data != nullptr); + std::unique_ptr buf(new char[2 * string_val.size + 1]); + mysql_real_escape_string(_mysql_conn, buf.get(), string_val.data, string_val.size); + fmt::format_to(_insert_stmt_buffer, "'{}'", buf.get()); + + break; + } + case TYPE_DECIMALV2: { + DecimalV2Value value = + (DecimalV2Value) + assert_cast&>( + *column) + .get_data()[row]; + fmt::format_to(_insert_stmt_buffer, "{}", value.to_string()); + break; + } + case TYPE_DATE: + case TYPE_DATETIME: { + int64_t int_val = assert_cast(*column).get_data()[row]; + vectorized::VecDateTimeValue value = + binary_cast(int_val); + + char buf[64]; + char* pos = value.to_string(buf); + std::string str(buf, pos - buf - 1); + fmt::format_to(_insert_stmt_buffer, "'{}'", str); + break; + } + default: { + fmt::memory_buffer err_out; + fmt::format_to(err_out, "can't convert this type to mysql type. type = {}", + _vec_output_expr_ctxs[i]->root()->type().type); + return Status::InternalError(err_out.data()); + } + } + } + + fmt::format_to(_insert_stmt_buffer, "{}", ")"); + + // Insert this to MySQL server + if (mysql_real_query(_mysql_conn, _insert_stmt_buffer.data(), _insert_stmt_buffer.size())) { + fmt::memory_buffer err_ss; + fmt::format_to(err_ss, "Insert to mysql server({}) failed, because: {}.", + mysql_get_host_info(_mysql_conn), mysql_error(_mysql_conn)); + return Status::InternalError(err_ss.data()); + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/mysql_table_writer.h b/be/src/runtime/mysql_table_writer.h index 8a1d04595d7233..770910cc45daf5 100644 --- a/be/src/runtime/mysql_table_writer.h +++ b/be/src/runtime/mysql_table_writer.h @@ -18,11 +18,11 @@ #ifndef DORIS_BE_RUNTIME_MYSQL_TABLE_WRITER_H #define DORIS_BE_RUNTIME_MYSQL_TABLE_WRITER_H +#include #include #include #include "common/status.h" - #ifndef __DorisMysql #define __DorisMysql void #endif @@ -42,10 +42,15 @@ struct MysqlConnInfo { class RowBatch; class TupleRow; class ExprContext; +namespace vectorized { +class VExprContext; +class Block; +} class MysqlTableWriter { public: MysqlTableWriter(const std::vector& output_exprs); + MysqlTableWriter(const std::vector& output_exprs); ~MysqlTableWriter(); // connect to mysql server @@ -55,6 +60,8 @@ class MysqlTableWriter { Status append(RowBatch* batch); + Status append(vectorized::Block* block); + Status abort_tarns() { return Status::OK(); } Status finish_tarns() { return Status::OK(); } @@ -63,6 +70,10 @@ class MysqlTableWriter { Status insert_row(TupleRow* row); const std::vector& _output_expr_ctxs; + //vectorized mode insert_row + Status insert_row(vectorized::Block& block, size_t row); + const std::vector& _vec_output_expr_ctxs; + fmt::memory_buffer _insert_stmt_buffer; std::string _mysql_tbl; __DorisMysql* _mysql_conn; }; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 6201c67a4425b2..84fc57173352d7 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -149,6 +149,7 @@ set(VEC_FILES sink/result_sink.cpp sink/vdata_stream_sender.cpp sink/vtablet_sink.cpp + sink/vmysql_table_writer.cpp runtime/vdatetime_value.cpp runtime/vdata_stream_recvr.cpp runtime/vdata_stream_mgr.cpp diff --git a/be/src/vec/sink/vmysql_table_writer.cpp b/be/src/vec/sink/vmysql_table_writer.cpp new file mode 100644 index 00000000000000..ef7f9c0168cdd6 --- /dev/null +++ b/be/src/vec/sink/vmysql_table_writer.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "exprs/expr.h" +#include "runtime/mem_tracker.h" +#include "runtime/mysql_table_sink.h" +#include "runtime/runtime_state.h" +#include "util/debug_util.h" +#include "util/runtime_profile.h" +#include "vec/sink/vmysql_table_writer.h" +#include "vec/exprs/vexpr.h" + +namespace doris { +namespace vectorized { +VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& t_exprs) + : _pool(pool), + _row_desc(row_desc), + _t_output_expr(t_exprs), + _mem_tracker(MemTracker::CreateTracker(-1, "VMysqlTableSink")) { + _name = "VMysqlTableSink"; +} + +VMysqlTableSink::~VMysqlTableSink() {} + +Status VMysqlTableSink::init(const TDataSink& t_sink) { + RETURN_IF_ERROR(DataSink::init(t_sink)); + const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink; + + _conn_info.host = t_mysql_sink.host; + _conn_info.port = t_mysql_sink.port; + _conn_info.user = t_mysql_sink.user; + _conn_info.passwd = t_mysql_sink.passwd; + _conn_info.db = t_mysql_sink.db; + _mysql_tbl = t_mysql_sink.table; + + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs)); + return Status::OK(); +} + +Status VMysqlTableSink::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSink::prepare(state)); + // Prepare the exprs to run. + RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker)); + std::stringstream title; + title << "VMysqlTableSink (frag_id=" << state->fragment_instance_id() << ")"; + // create profile + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); + return Status::OK(); +} + +Status VMysqlTableSink::open(RuntimeState* state) { + // Prepare the exprs to run. + RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state)); + // create writer + _writer = state->obj_pool()->add(new doris::MysqlTableWriter(_output_expr_ctxs)); + RETURN_IF_ERROR(_writer->open(_conn_info, _mysql_tbl)); + return Status::OK(); +} + +Status VMysqlTableSink::send(RuntimeState* state, RowBatch* batch) { + return Status::NotSupported("Not Implemented VMysqlTableSink::send(RuntimeState* state, RowBatch* batch)"); +} + +Status VMysqlTableSink::send(RuntimeState* state, Block* block) { + return _writer->append(block); +} + +Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) { + VExpr::close(_output_expr_ctxs, state); + return Status::OK(); +} +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/vmysql_table_writer.h b/be/src/vec/sink/vmysql_table_writer.h new file mode 100644 index 00000000000000..694f97fb6138c5 --- /dev/null +++ b/be/src/vec/sink/vmysql_table_writer.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include + +#include "common/status.h" +#include "exec/data_sink.h" +#include "runtime/mysql_table_writer.h" + +namespace doris { + +class RowDescriptor; +class TExpr; +class TMysqlTableSink; +class RuntimeState; +class RuntimeProfile; +class ExprContext; +class MemTracker; +namespace vectorized { + +class VExprContext; +class VExpr; +// This class is a sinker, which put input data to mysql table +class VMysqlTableSink : public DataSink { +public: + VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& t_exprs); + + ~VMysqlTableSink(); + + Status init(const TDataSink& thrift_sink) override; + + Status prepare(RuntimeState* state) override; + + Status open(RuntimeState* state) override; + + Status send(RuntimeState* state, RowBatch* batch) override; + + Status send(RuntimeState* state, vectorized::Block* block) override; + // Flush all buffered data and close all existing channels to destination + // hosts. Further send() calls are illegal after calling close(). + Status close(RuntimeState* state, Status exec_status) override; + + RuntimeProfile* profile() override { return _profile; } + +private: + // owned by RuntimeState + ObjectPool* _pool; + const RowDescriptor& _row_desc; + const std::vector& _t_output_expr; + + std::vector _output_expr_ctxs; + MysqlConnInfo _conn_info; + std::string _mysql_tbl; + MysqlTableWriter* _writer; + + RuntimeProfile* _profile; + std::shared_ptr _mem_tracker; +}; +} // namespace vectorized +} // namespace doris +