Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion be/src/vec/data_types/serde/data_type_array_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,
const auto end_arr_element = offsets[row_idx_of_col_arr];
for (auto j = begin_arr_element; j < end_arr_element; ++j) {
if (j != begin_arr_element) {
if (0 != result.push_string(", ", 2)) {
if (0 != result.push_string(options.mysql_collection_delim.c_str(),
options.mysql_collection_delim.size())) {
return Status::InternalError("pack mysql buffer failed.");
}
}
Expand All @@ -340,6 +341,7 @@ Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,
return Status::InternalError("pack mysql buffer failed.");
}
} else {
++options.level;
if (is_nested_string && options.wrapper_len > 0) {
if (0 != result.push_string(options.nested_string_wrapper, options.wrapper_len)) {
return Status::InternalError("pack mysql buffer failed.");
Expand All @@ -353,6 +355,7 @@ Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,
RETURN_IF_ERROR(
nested_serde->write_column_to_mysql(data, result, j, false, options));
}
--options.level;
}
}
if (0 != result.push_string("]", 1)) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/data_types/serde/data_type_map_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ Status DataTypeMapSerDe::_write_column_to_mysql(const IColumn& column,
const auto& offsets = map_column.get_offsets();
for (auto j = offsets[col_index - 1]; j < offsets[col_index]; ++j) {
if (j != offsets[col_index - 1]) {
if (0 != result.push_string(", ", 2)) {
if (0 != result.push_string(options.mysql_collection_delim.c_str(),
options.mysql_collection_delim.size())) {
return Status::InternalError("pack mysql buffer failed.");
}
}
Expand All @@ -428,6 +429,7 @@ Status DataTypeMapSerDe::_write_column_to_mysql(const IColumn& column,
return Status::InternalError("pack mysql buffer failed.");
}
} else {
++options.level;
if (is_key_string && options.wrapper_len > 0) {
if (0 != result.push_string(options.nested_string_wrapper, options.wrapper_len)) {
return Status::InternalError("pack mysql buffer failed.");
Expand All @@ -441,6 +443,7 @@ Status DataTypeMapSerDe::_write_column_to_mysql(const IColumn& column,
RETURN_IF_ERROR(key_serde->write_column_to_mysql(nested_keys_column, result, j,
false, options));
}
--options.level;
}
if (0 != result.push_string(&options.map_key_delim, 1)) {
return Status::InternalError("pack mysql buffer failed.");
Expand All @@ -450,6 +453,7 @@ Status DataTypeMapSerDe::_write_column_to_mysql(const IColumn& column,
return Status::InternalError("pack mysql buffer failed.");
}
} else {
++options.level;
if (is_val_string && options.wrapper_len > 0) {
if (0 != result.push_string(options.nested_string_wrapper, options.wrapper_len)) {
return Status::InternalError("pack mysql buffer failed.");
Expand All @@ -463,6 +467,7 @@ Status DataTypeMapSerDe::_write_column_to_mysql(const IColumn& column,
RETURN_IF_ERROR(value_serde->write_column_to_mysql(nested_values_column, result, j,
false, options));
}
--options.level;
}
}
if (0 != result.push_string("}", 1)) {
Expand Down
9 changes: 8 additions & 1 deletion be/src/vec/data_types/serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,15 @@ Status DataTypeNumberSerDe<T>::_write_column_to_mysql(const IColumn& column,
int buf_ret = 0;
auto& data = assert_cast<const ColumnType&>(column).get_data();
const auto col_index = index_check_const(row_idx, col_const);
if constexpr (std::is_same_v<T, Int8> || std::is_same_v<T, UInt8>) {
if constexpr (std::is_same_v<T, Int8>) {
buf_ret = result.push_tinyint(data[col_index]);
} else if constexpr (std::is_same_v<T, UInt8>) {
if (options.level > 0 && !options.is_bool_value_num) {
std::string bool_value = data[col_index] ? "true" : "false";
result.push_string(bool_value.c_str(), bool_value.size());
} else {
buf_ret = result.push_tinyint(data[col_index]);
}
} else if constexpr (std::is_same_v<T, Int16> || std::is_same_v<T, UInt16>) {
buf_ret = result.push_smallint(data[col_index]);
} else if constexpr (std::is_same_v<T, Int32> || std::is_same_v<T, UInt32>) {
Expand Down
20 changes: 20 additions & 0 deletions be/src/vec/data_types/serde/data_type_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ class DataTypeSerDe {
const char* nested_string_wrapper;
int wrapper_len;

/**
* mysql_collection_delim is used to separate elements in collection, such as array, map, struct
* It is used to write to mysql.
*/
std::string mysql_collection_delim = ", ";

/**
* is_bool_value_num is used to display bool value in collection, such as array, map, struct
* eg, if set to true, the array<true> will be:
* [1]
* if set to false, the array<true> will be:
* [true]
*/
bool is_bool_value_num = true;

/**
* Indicate the nested level of column. It is used to control some behavior of serde
*/
mutable int level = 0;

[[nodiscard]] char get_collection_delimiter(
int hive_text_complex_type_delimiter_level) const {
CHECK(0 <= hive_text_complex_type_delimiter_level &&
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/data_types/serde/data_type_struct_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ Status DataTypeStructSerDe::_write_column_to_mysql(const IColumn& column,
bool begin = true;
for (size_t j = 0; j < elem_serdes_ptrs.size(); ++j) {
if (!begin) {
if (0 != result.push_string(", ", 2)) {
if (0 != result.push_string(options.mysql_collection_delim.c_str(),
options.mysql_collection_delim.size())) {
return Status::InternalError("pack mysql buffer failed.");
}
}
Expand All @@ -375,6 +376,7 @@ Status DataTypeStructSerDe::_write_column_to_mysql(const IColumn& column,
return Status::InternalError("pack mysql buffer failed.");
}
} else {
++options.level;
if (remove_nullable(col.get_column_ptr(j))->is_column_string() &&
options.wrapper_len > 0) {
if (0 != result.push_string(options.nested_string_wrapper, options.wrapper_len)) {
Expand All @@ -389,6 +391,7 @@ Status DataTypeStructSerDe::_write_column_to_mysql(const IColumn& column,
RETURN_IF_ERROR(elem_serdes_ptrs[j]->write_column_to_mysql(
col.get_column(j), result, col_index, false, options));
}
--options.level;
}
begin = false;
}
Expand Down
16 changes: 16 additions & 0 deletions be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ Status VMysqlResultWriter<is_binary_format>::_set_options(
_options.map_key_delim = ':';
_options.null_format = "null";
_options.null_len = 4;
_options.mysql_collection_delim = ", ";
_options.is_bool_value_num = true;
break;
case TSerdeDialect::PRESTO:
// eg:
Expand All @@ -151,6 +153,20 @@ Status VMysqlResultWriter<is_binary_format>::_set_options(
_options.map_key_delim = '=';
_options.null_format = "NULL";
_options.null_len = 4;
_options.mysql_collection_delim = ", ";
_options.is_bool_value_num = true;
break;
case TSerdeDialect::HIVE:
// eg:
// array: ["abc","def","",null]
// map: {"k1":null,"k2":"v3"}
_options.nested_string_wrapper = "\"";
_options.wrapper_len = 1;
_options.map_key_delim = ':';
_options.null_format = "null";
_options.null_len = 4;
_options.mysql_collection_delim = ",";
_options.is_bool_value_num = false;
break;
default:
return Status::InternalError("unknown serde dialect: {}", serde_dialect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ private void setFormatOptions() {
statementContext.setFormatOptions(FormatOptions.getForPresto());
break;
case "doris":
case "hive":
statementContext.setFormatOptions(FormatOptions.getDefault());
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4463,9 +4463,11 @@ public void checkSerdeDialect(String serdeDialect) {
throw new UnsupportedOperationException("serdeDialect value is empty");
}

if (!serdeDialect.equalsIgnoreCase("doris") && !serdeDialect.equalsIgnoreCase("presto")
&& !serdeDialect.equalsIgnoreCase("trino")) {
LOG.warn("serdeDialect value is invalid, the invalid value is {}", serdeDialect);
if (!serdeDialect.equalsIgnoreCase("doris")
&& !serdeDialect.equalsIgnoreCase("presto")
&& !serdeDialect.equalsIgnoreCase("trino")
&& !serdeDialect.equalsIgnoreCase("hive")) {
LOG.warn("serde dialect value is invalid, the invalid value is {}", serdeDialect);
throw new UnsupportedOperationException(
"sqlDialect value is invalid, the invalid value is " + serdeDialect);
}
Expand Down Expand Up @@ -4637,6 +4639,8 @@ public TSerdeDialect getSerdeDialect() {
case "presto":
case "trino":
return TSerdeDialect.PRESTO;
case "hive":
return TSerdeDialect.HIVE;
default:
throw new IllegalArgumentException("Unknown serde dialect: " + serdeDialect);
}
Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ struct TResourceLimit {

enum TSerdeDialect {
DORIS = 0,
PRESTO = 1
PRESTO = 1,
HIVE = 2
}

// Query options that correspond to PaloService.PaloQueryOptions,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql01 --
1 2 3 4 5 1.1 2.0 123456.123456789 2024-06-30 2024-06-30T10:10:11 2024-06-30T10:10:11.123456 59.50.185.152 ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff this is a string with , and " abc ef 123ndedwdw true [1,2,3,4,5] [1, 2, 3, null, 5] [1.1, 2.1, 3.1, null, 5] [1.10000, 2.10000, 3.00000, null, 5.12345] ["abc", "de, f"", null, ""] [{"k1":"v1", "k2":null, "k3":"", "k4":"a , "a"}, {"k1":"v1", "k2":null, "k3 , "abc":"", "k4":"a , "a"}] [["abc", "de, f"", null, ""], [], null] \N \N {"k1":"v1", "k2":null, "k3":"", "k4":"a , "a"} {"k1":[["abc", "de, f"", null, ""], [], null], "k2":null} {10:{"k1":[["abc", "de, f"", null, ""], [], null]}, 11:null} \N {"s_id":100, "s_name":"abc , "", "s_address":null} {"s_id":null, "s_name":["abc", "de, f"", null, ""], "s_address":""} ["2024-06-01", null, "2024-06-03"] ["2024-06-01 10:10:10.000", null, "2024-06-03 01:11:23.123"] [1, 1, 0, 0, 1, 0, 0] {"s_id":100, "s_name":"abc , "", "s_gender":1} {"k1":0, "k2":1, "k3":1, "k4":0}

-- !sql01 --
1 2 3 4 5 1.1 2.0 123456.123456789 2024-06-30 2024-06-30T10:10:11 2024-06-30T10:10:11.123456 59.50.185.152 ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff this is a string with , and " abc ef 123ndedwdw true [1,2,3,4,5] [1,2,3,null,5] [1.1,2.1,3.1,null,5] [1.10000,2.10000,3.00000,null,5.12345] ["abc","de, f"",null,""] [{"k1":"v1","k2":null,"k3":"","k4":"a , "a"},{"k1":"v1","k2":null,"k3 , "abc":"","k4":"a , "a"}] [["abc","de, f"",null,""],[],null] \N \N {"k1":"v1","k2":null,"k3":"","k4":"a , "a"} {"k1":[["abc","de, f"",null,""],[],null],"k2":null} {10:{"k1":[["abc","de, f"",null,""],[],null]},11:null} \N {"s_id":100,"s_name":"abc , "","s_address":null} {"s_id":null,"s_name":["abc","de, f"",null,""],"s_address":""} ["2024-06-01",null,"2024-06-03"] ["2024-06-01 10:10:10.000",null,"2024-06-03 01:11:23.123"] [true,true,false,false,true,false,false] {"s_id":100,"s_name":"abc , "","s_gender":true} {"k1":false,"k2":true,"k3":true,"k4":false}

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_serde_dialect_hive", "p0") {

sql """create database if not exists test_serde_dialect_hive;"""
sql """use test_serde_dialect_hive;"""
sql """drop table if exists test_serde_dialect_hive_tbl"""
sql """
create table if not exists test_serde_dialect_hive_tbl (
c1 tinyint,
c2 smallint,
c3 int,
c4 bigint,
c5 largeint,
c6 float,
c7 double,
c8 decimal(27, 9),
c9 date,
c10 datetime,
c11 datetime(6),
c12 ipv4,
c13 ipv6,
c14 string,
c15 char(6),
c16 varchar(1024),
c17 boolean,
c18 json,
c19 array<int>,
c20 array<double>,
c21 array<decimal(10, 5)>,
c22 array<string>,
c23 array<map<string, string>>,
c24 array<array<string>>,
c25 array<struct<s_id:int(11), s_name:string, s_address:string>>,
c26 array<struct<s_id:struct<k1:string, k2:decimal(10,2)>, s_name:array<ipv4>, s_address:map<string, ipv6>>>,
c27 map<string, string>,
c28 map<string, array<array<string>>>,
c29 map<int, map<string, array<array<string>>>>,
c30 map<decimal(5, 3), array<struct<s_id:struct<k1:string, k2:decimal(10,2)>, s_name:array<string>, s_address:map<string, string>>>>,
c31 struct<s_id:int(11), s_name:string, s_address:string>,
c32 struct<s_id:int(11), s_name:array<string>, s_address:string>,
c33 array<date>,
c34 array<datetime(3)>,
c35 array<boolean>,
c36 struct<s_id:int(11), s_name:string, s_gender:boolean>,
c37 map<string, boolean>
)
distributed by random buckets 1
properties("replication_num" = "1");
"""

sql """
insert into test_serde_dialect_hive_tbl
(c1, c2,c3, c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c27,c28,c29,c31,c32,c33,c34,c35,c36,c37)
values(
1,2,3,4,5,1.1,2.0000,123456.123456789,"2024-06-30", "2024-06-30 10:10:11", "2024-06-30 10:10:11.123456",
'59.50.185.152',
'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff',
'this is a string with , and "',
'abc ef',
' 123ndedwdw',
true,
'[1, 2, 3, 4, 5]',
[1,2,3,null,5],
[1.1,2.1,3.1,null,5.00],
[1.1,2.1,3.00000,null,5.12345],
['abc', 'de, f"', null, ''],
[{'k1': 'v1', 'k2': null, 'k3':'', 'k4':'a , "a'}, {'k1': 'v1', 'k2': null, 'k3 , "abc':'', 'k4':'a , "a'}],
[['abc', 'de, f"', null, ''],[],null],
{'k1': 'v1', 'k2': null, 'k3':'', 'k4':'a , "a'},
{'k1': [['abc', 'de, f"', null, ''],[],null], 'k2': null},
{10: {'k1': [['abc', 'de, f"', null, ''],[],null]}, 11: null},
named_struct('s_id', 100, 's_name', 'abc , "', 's_address', null),
named_struct('s_id', null, 's_name', ['abc', 'de, f"', null, ''], 's_address', ''),
['2024-06-01',null,'2024-06-03'],
['2024-06-01 10:10:10',null,'2024-06-03 01:11:23.123'],
[true, true, false, false, true, false, false],
named_struct('s_id', 100, 's_name', 'abc , "', 's_gender', true),
{'k1': false, 'k2': true, 'k3':true, 'k4': false}
);
"""

sql """set serde_dialect="doris";"""
qt_sql01 """select * from test_serde_dialect_hive_tbl"""
sql """set serde_dialect="hive";"""
qt_sql01 """select * from test_serde_dialect_hive_tbl"""

test {
sql """set serde_dialect="invalid""""
exception "sqlDialect value is invalid"
}
}
Loading