diff --git a/be/src/exec/mysql_scan_node.cpp b/be/src/exec/mysql_scan_node.cpp index d395f2bf709019..cdd3682809916d 100644 --- a/be/src/exec/mysql_scan_node.cpp +++ b/be/src/exec/mysql_scan_node.cpp @@ -75,6 +75,7 @@ Status MysqlScanNode::prepare(RuntimeState* state) { _my_param.user = mysql_table->user(); _my_param.passwd = mysql_table->passwd(); _my_param.db = mysql_table->mysql_db(); + _my_param.charset = mysql_table->charset(); // new one scanner _mysql_scanner.reset(new (std::nothrow) MysqlScanner(_my_param)); diff --git a/be/src/exec/mysql_scanner.cpp b/be/src/exec/mysql_scanner.cpp index 65f6e1e8177880..9ff38487a746ed 100644 --- a/be/src/exec/mysql_scanner.cpp +++ b/be/src/exec/mysql_scanner.cpp @@ -73,7 +73,7 @@ Status MysqlScanner::open() { return _error_status("mysql real connect failed."); } - if (mysql_set_character_set(_my_conn, "utf8")) { + if (mysql_set_character_set(_my_conn, _my_param.charset.c_str())) { return Status::InternalError("mysql set character set failed."); } diff --git a/be/src/exec/mysql_scanner.h b/be/src/exec/mysql_scanner.h index ea7368760e9401..75f60c44ea5ad9 100644 --- a/be/src/exec/mysql_scanner.h +++ b/be/src/exec/mysql_scanner.h @@ -41,6 +41,7 @@ struct MysqlScannerParam { std::string user; std::string passwd; std::string db; + std::string charset; unsigned long client_flag; MysqlScannerParam() : client_flag(0) {} }; diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 6c856f719bbcc1..aa762048b7fe56 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -186,13 +186,14 @@ MySQLTableDescriptor::MySQLTableDescriptor(const TTableDescriptor& tdesc) _host(tdesc.mysqlTable.host), _port(tdesc.mysqlTable.port), _user(tdesc.mysqlTable.user), - _passwd(tdesc.mysqlTable.passwd) {} + _passwd(tdesc.mysqlTable.passwd), + _charset(tdesc.mysqlTable.charset) {} std::string MySQLTableDescriptor::debug_string() const { std::stringstream out; out << "MySQLTable(" << TableDescriptor::debug_string() << " _db" << _mysql_db << " table=" << _mysql_table << " host=" << _host << " port=" << _port << " user=" << _user - << " passwd=" << _passwd; + << " passwd=" << _passwd << " charset=" << _charset; return out.str(); } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 884a2a53790ee6..8b853e9b0328f3 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -234,6 +234,7 @@ class MySQLTableDescriptor : public TableDescriptor { const std::string port() const { return _port; } const std::string user() const { return _user; } const std::string passwd() const { return _passwd; } + const std::string charset() const { return _charset; } private: std::string _mysql_db; @@ -242,6 +243,7 @@ class MySQLTableDescriptor : public TableDescriptor { std::string _port; std::string _user; std::string _passwd; + std::string _charset; }; class ODBCTableDescriptor : public TableDescriptor { diff --git a/be/src/runtime/mysql_table_sink.cpp b/be/src/runtime/mysql_table_sink.cpp index ad4f4db0859582..c2cdd257123be4 100644 --- a/be/src/runtime/mysql_table_sink.cpp +++ b/be/src/runtime/mysql_table_sink.cpp @@ -48,6 +48,7 @@ Status MysqlTableSink::init(const TDataSink& t_sink) { _conn_info.passwd = t_mysql_sink.passwd; _conn_info.db = t_mysql_sink.db; _mysql_tbl = t_mysql_sink.table; + _conn_info.charset = t_mysql_sink.charset; // From the thrift expressions create the real exprs. RETURN_IF_ERROR(Expr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs)); diff --git a/be/src/runtime/mysql_table_writer.cpp b/be/src/runtime/mysql_table_writer.cpp index 1eee906d80b9b1..def4845a4f3192 100644 --- a/be/src/runtime/mysql_table_writer.cpp +++ b/be/src/runtime/mysql_table_writer.cpp @@ -32,7 +32,7 @@ std::string MysqlConnInfo::debug_string() const { std::stringstream ss; ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db=" << db - << ",passwd=" << passwd << ")"; + << ",passwd=" << passwd << ",charset=" << charset << ")"; return ss.str(); } @@ -62,7 +62,7 @@ Status MysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string& } // set character - if (mysql_set_character_set(_mysql_conn, "utf8")) { + if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) { std::stringstream ss; ss << "mysql_set_character_set failed because " << mysql_error(_mysql_conn); return Status::InternalError(ss.str()); diff --git a/be/src/runtime/mysql_table_writer.h b/be/src/runtime/mysql_table_writer.h index 8639bb8b9aa36d..61544ad538dca6 100644 --- a/be/src/runtime/mysql_table_writer.h +++ b/be/src/runtime/mysql_table_writer.h @@ -32,6 +32,7 @@ struct MysqlConnInfo { std::string passwd; std::string db; int port; + std::string charset; std::string debug_string() const; }; diff --git a/be/src/vec/sink/vmysql_table_sink.cpp b/be/src/vec/sink/vmysql_table_sink.cpp index 48e4501fd3cabd..cf112bb815ead5 100644 --- a/be/src/vec/sink/vmysql_table_sink.cpp +++ b/be/src/vec/sink/vmysql_table_sink.cpp @@ -48,6 +48,7 @@ Status VMysqlTableSink::init(const TDataSink& t_sink) { _conn_info.passwd = t_mysql_sink.passwd; _conn_info.db = t_mysql_sink.db; _mysql_tbl = t_mysql_sink.table; + _conn_info.charset = t_mysql_sink.charset; // From the thrift expressions create the real exprs. RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs)); diff --git a/be/src/vec/sink/vmysql_table_writer.cpp b/be/src/vec/sink/vmysql_table_writer.cpp index c141cd65cdf617..0fb1cb89e48254 100644 --- a/be/src/vec/sink/vmysql_table_writer.cpp +++ b/be/src/vec/sink/vmysql_table_writer.cpp @@ -59,7 +59,7 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string } // set character - if (mysql_set_character_set(_mysql_conn, "utf8")) { + if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) { fmt::memory_buffer err_ss; fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.", mysql_error(_mysql_conn)); diff --git a/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md b/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md index 5ca46b1be0054a..6fb77130c80660 100644 --- a/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md +++ b/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md @@ -48,6 +48,7 @@ Which type of external table is mainly identified by the ENGINE type, currently "table" = "table_name" ) ```` + and there is an optional propertiy "charset" which can set character fom mysql connection, default value is "utf8". You can set another value "utf8mb4" instead of "utf8" when you need. Notice: @@ -133,7 +134,8 @@ Which type of external table is mainly identified by the ENGINE type, currently "user" = "mysql_user", "password" = "mysql_passwd", "database" = "mysql_db_test", - "table" = "mysql_table_test" + "table" = "mysql_table_test", + "charset" = "utf8mb4" ) ```` diff --git a/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md index 96bd2f7d9db611..0b0fe64b4833ab 100644 --- a/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md +++ b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md @@ -48,6 +48,7 @@ CREATE EXTERNAL TABLE "table" = "table_name" ) ``` + 以及一个可选属性"charset",可以用来设置mysql连接的字符集, 默认值是"utf8"。如有需要,你可以设置为另外一个字符集"utf8mb4"。 注意: @@ -133,7 +134,8 @@ CREATE EXTERNAL TABLE "user" = "mysql_user", "password" = "mysql_passwd", "database" = "mysql_db_test", - "table" = "mysql_table_test" + "table" = "mysql_table_test", + "charset" = "utf8mb4" ) ``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java index dd70725acf1525..00ac91b804a8df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java @@ -207,7 +207,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { mysqlTable.getUserName(), mysqlTable.getPasswd(), mysqlTable.getMysqlDatabaseName(), - mysqlTable.getMysqlTableName()); + mysqlTable.getMysqlTableName(), + mysqlTable.getCharset()); totalRows.add(row); } else { ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, table.getType()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index df61e2b18af2dc..abe86282240f99 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4282,6 +4282,7 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, Table table, List< sb.append("\"port\" = \"").append(mysqlTable.getPort()).append("\",\n"); sb.append("\"user\" = \"").append(mysqlTable.getUserName()).append("\",\n"); sb.append("\"password\" = \"").append(hidePassword ? "" : mysqlTable.getPasswd()).append("\",\n"); + sb.append("\"charset\" = \"").append(mysqlTable.getCharset()).append("\",\n"); } else { sb.append("\"odbc_catalog_resource\" = \"").append(mysqlTable.getOdbcCatalogResourceName()).append("\",\n"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java index 8f3fa4522f1721..d76b70602d1d00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java @@ -48,6 +48,7 @@ public class MysqlTable extends Table { private static final String MYSQL_PASSWORD = "password"; private static final String MYSQL_DATABASE = "database"; private static final String MYSQL_TABLE = "table"; + private static final String MYSQL_CHARSET = "charset"; private String odbcCatalogResourceName; private String host; @@ -56,6 +57,7 @@ public class MysqlTable extends Table { private String passwd; private String mysqlDatabaseName; private String mysqlTableName; + private String charset; public MysqlTable() { super(TableType.MYSQL); @@ -124,6 +126,15 @@ private void validate(Map properties) throws DdlException { throw new DdlException("Password of MySQL table is null. " + "Please set proper resource or add properties('password'='xxxx') when create table"); } + + charset = properties.get(MYSQL_CHARSET); + if (charset == null) { + charset = "utf8"; + } + if (!charset.equalsIgnoreCase("utf8") && !charset.equalsIgnoreCase("utf8mb4")) { + throw new DdlException("Unknown character set of MySQL table. " + + "Please set charset 'utf8' or 'utf8mb4', other charsets not be unsupported now."); + } } mysqlDatabaseName = properties.get(MYSQL_DATABASE); @@ -193,9 +204,16 @@ public String getMysqlTableName() { return mysqlTableName; } + public String getCharset() { + if (charset != null) { + return charset; + } + return "utf8"; + } + public TTableDescriptor toThrift() { - TMySQLTable tMySQLTable = - new TMySQLTable(getHost(), getPort(), getUserName(), getPasswd(), mysqlDatabaseName, mysqlTableName); + TMySQLTable tMySQLTable = new TMySQLTable(getHost(), getPort(), getUserName(), getPasswd(), + mysqlDatabaseName, mysqlTableName, getCharset()); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MYSQL_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setMysqlTable(tMySQLTable); @@ -213,6 +231,7 @@ public String getSignature(int signatureVersion) { sb.append(getPasswd()); sb.append(mysqlDatabaseName); sb.append(mysqlTableName); + sb.append(getCharset()); String md5 = DigestUtils.md5Hex(sb.toString()); LOG.debug("get signature of mysql table {}: {}. signature string: {}", name, md5, sb.toString()); return md5; @@ -230,6 +249,7 @@ public void write(DataOutput out) throws IOException { serializeMap.put(MYSQL_PASSWORD, passwd); serializeMap.put(MYSQL_DATABASE, mysqlDatabaseName); serializeMap.put(MYSQL_TABLE, mysqlTableName); + serializeMap.put(MYSQL_CHARSET, charset); int size = (int) serializeMap.values().stream().filter(v -> { return v != null; @@ -262,5 +282,6 @@ public void readFields(DataInput in) throws IOException { passwd = serializeMap.get(MYSQL_PASSWORD); mysqlDatabaseName = serializeMap.get(MYSQL_DATABASE); mysqlTableName = serializeMap.get(MYSQL_TABLE); + charset = serializeMap.get(MYSQL_CHARSET); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlTableSink.java index 2104a70465f19a..91c3360fa6a9f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlTableSink.java @@ -30,6 +30,7 @@ public class MysqlTableSink extends DataSink { private final String passwd; private final String db; private final String tbl; + private final String charset; public MysqlTableSink(MysqlTable mysqlTable) { host = mysqlTable.getHost(); @@ -38,6 +39,7 @@ public MysqlTableSink(MysqlTable mysqlTable) { passwd = mysqlTable.getPasswd(); db = mysqlTable.getMysqlDatabaseName(); tbl = mysqlTable.getMysqlTableName(); + charset = mysqlTable.getCharset(); } @Override @@ -52,7 +54,7 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { protected TDataSink toThrift() { TDataSink tDataSink = new TDataSink(TDataSinkType.MYSQL_TABLE_SINK); - tDataSink.setMysqlTableSink(new TMysqlTableSink(host, port, user, passwd, db, tbl)); + tDataSink.setMysqlTableSink(new TMysqlTableSink(host, port, user, passwd, db, tbl, charset)); return tDataSink; } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 1eaede3f9be407..91716a4f9144cb 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -94,6 +94,7 @@ struct TMysqlTableSink { 4: required string passwd 5: required string db 6: required string table + 7: required string charset } struct TOdbcTableSink { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index f78bc5f6aa823f..7aebce70b68884 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -214,6 +214,7 @@ struct TMySQLTable { 4: required string passwd 5: required string db 6: required string table + 7: required string charset } struct TOdbcTable {