Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/vec/data_types/data_type_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* column) const {
auto* map_column = assert_cast<ColumnMap*>(column);

if (*rb.position() != '{') {
return Status::InvalidArgument("map does not start with '{' character, found '{}'",
return Status::InvalidArgument("map does not start with '{}' character, found '{}'", "{",
*rb.position());
}
if (*(rb.end() - 1) != '}') {
return Status::InvalidArgument("map does not end with '}' character, found '{}'",
return Status::InvalidArgument("map does not end with '{}' character, found '{}'", "}",
*(rb.end() - 1));
}

Expand Down
188 changes: 152 additions & 36 deletions be/src/vec/data_types/data_type_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ DataTypeStruct::DataTypeStruct(const DataTypes& elems_, const Strings& names_)
}

Status st = check_tuple_names(names);
//if (!st.ok()) {
//}
}

std::string DataTypeStruct::do_get_name() const {
Expand All @@ -68,28 +66,91 @@ std::string DataTypeStruct::do_get_name() const {
if (i != 0) {
s << ", ";
}

// if (have_explicit_names) {
// s << back_quote_if_need(names[i]) << ' ';
// }

s << elems[i]->get_name();
}
s << ")";

return s.str();
}

bool next_slot_from_string(ReadBuffer& rb, StringRef& output, bool& is_name, bool& has_quota) {
StringRef element(rb.position(), 0);
has_quota = false;
is_name = false;
if (rb.eof()) {
return false;
}

// ltrim
while (!rb.eof() && isspace(*rb.position())) {
++rb.position();
element.data = rb.position();
}

// parse string
if (*rb.position() == '"' || *rb.position() == '\'') {
const char str_sep = *rb.position();
size_t str_len = 1;
// search until next '"' or '\''
while (str_len < rb.count() && *(rb.position() + str_len) != str_sep) {
++str_len;
}
// invalid string
if (str_len >= rb.count()) {
rb.position() = rb.end();
return false;
}
has_quota = true;
rb.position() += str_len + 1;
element.size += str_len + 1;
}

// parse element until separator ':' or ',' or end '}'
while (!rb.eof() && (*rb.position() != ':') && (*rb.position() != ',') &&
(rb.count() != 1 || *rb.position() != '}')) {
if (has_quota && !isspace(*rb.position())) {
return false;
}
++rb.position();
++element.size;
}
// invalid element
if (rb.eof()) {
return false;
}

if (*rb.position() == ':') {
is_name = true;
}

// adjust read buffer position to first char of next element
++rb.position();

// rtrim
while (element.size > 0 && isspace(element.data[element.size - 1])) {
--element.size;
}

// trim '"' and '\'' for string
if (element.size >= 2 && (element.data[0] == '"' || element.data[0] == '\'') &&
element.data[0] == element.data[element.size - 1]) {
++element.data;
element.size -= 2;
}
output = element;
return true;
}

Status DataTypeStruct::from_string(ReadBuffer& rb, IColumn* column) const {
DCHECK(!rb.eof());
auto* struct_column = assert_cast<ColumnStruct*>(column);

if (*rb.position() != '{') {
return Status::InvalidArgument("Struct does not start with '{' character, found '{}'",
return Status::InvalidArgument("Struct does not start with '{}' character, found '{}'", "{",
*rb.position());
}
if (rb.count() < 2 || *(rb.end() - 1) != '}') {
return Status::InvalidArgument("Struct does not end with '}' character, found '{}'",
if (*(rb.end() - 1) != '}') {
return Status::InvalidArgument("Struct does not end with '{}' character, found '{}'", "}",
*(rb.end() - 1));
}

Expand All @@ -99,43 +160,98 @@ Status DataTypeStruct::from_string(ReadBuffer& rb, IColumn* column) const {
}

++rb.position();

bool is_explicit_names = false;
std::vector<std::string> field_names;
std::vector<ReadBuffer> field_rbs;
field_rbs.reserve(elems.size());
std::vector<size_t> field_pos;

// here get the value "jack" and 20 from {"name":"jack","age":20}
while (!rb.eof()) {
size_t field_len = 0;
auto start = rb.position();
while (!rb.eof() && *start != ',' && *start != '}') {
field_len++;
start++;
StringRef slot(rb.position(), rb.count());
bool has_quota = false;
bool is_name = false;
if (!next_slot_from_string(rb, slot, is_name, has_quota)) {
return Status::InvalidArgument("Cannot read struct field from text '{}'",
slot.to_string());
}
if (field_len >= rb.count()) {
return Status::InvalidArgument("Invalid Length");
}
ReadBuffer field_rb(rb.position(), field_len);

size_t len = 0;
auto start_rb = field_rb.position();
while (!field_rb.eof() && *start_rb != ':') {
len++;
start_rb++;
}
ReadBuffer field(field_rb.position() + len + 1, field_rb.count() - len - 1);
if (is_name) {
std::string name = slot.to_string();
if (!next_slot_from_string(rb, slot, is_name, has_quota)) {
return Status::InvalidArgument("Cannot read struct field from text '{}'",
slot.to_string());
}
ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size);
field_names.push_back(name);
field_rbs.push_back(field_rb);

if (field.count() >= 2 && ((*field.position() == '"' && *(field.end() - 1) == '"') ||
(*field.position() == '\'' && *(field.end() - 1) == '\''))) {
ReadBuffer field_no_quote(field.position() + 1, field.count() - 2);
field_rbs.push_back(field_no_quote);
if (!is_explicit_names) {
is_explicit_names = true;
}
} else {
field_rbs.push_back(field);
ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size);
field_rbs.push_back(field_rb);
}
}

rb.position() += field_len + 1;
// TODO: should we support insert default field value when actual field number is less than
// schema field number?
if (field_rbs.size() != elems.size()) {
std::string cmp_str = field_rbs.size() > elems.size() ? "more" : "less";
return Status::InvalidArgument(
"Actual struct field number {} is {} than schema field number {}.",
field_rbs.size(), cmp_str, elems.size());
}

if (is_explicit_names) {
if (field_names.size() != field_rbs.size()) {
return Status::InvalidArgument(
"Struct field name number {} is not equal to field number {}.",
field_names.size(), field_rbs.size());
}
std::unordered_set<std::string> name_set;
for (size_t i = 0; i < field_names.size(); i++) {
// check duplicate fields
auto ret = name_set.insert(field_names[i]);
if (!ret.second) {
return Status::InvalidArgument("Struct field name {} is duplicate with others.",
field_names[i]);
}
// check name valid
auto idx = try_get_position_by_name(field_names[i]);
if (idx == std::nullopt) {
return Status::InvalidArgument("Cannot find struct field name {} in schema.",
field_names[i]);
}
field_pos.push_back(idx.value());
}
} else {
for (size_t i = 0; i < field_rbs.size(); i++) {
field_pos.push_back(i);
}
}

for (size_t idx = 0; idx < elems.size(); idx++) {
elems[idx]->from_string(field_rbs[idx], &struct_column->get_column(idx));
auto field_rb = field_rbs[field_pos[idx]];
// handle empty element
if (field_rb.count() == 0) {
struct_column->get_column(idx).insert_default();
continue;
}
// handle null element
if (field_rb.count() == 4 && strncmp(field_rb.position(), "null", 4) == 0) {
auto& nested_null_col =
reinterpret_cast<ColumnNullable&>(struct_column->get_column(idx));
nested_null_col.insert_null_elements(1);
continue;
}
auto st = elems[idx]->from_string(field_rb, &struct_column->get_column(idx));
if (!st.ok()) {
// we should do column revert if error
for (size_t j = 0; j < idx; j++) {
struct_column->get_column(j).pop_back(1);
}
return st;
}
}

return Status::OK();
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,13 @@ FunctionContext::TypeDesc VExpr::column_type_to_type_desc(const TypeDescriptor&
out.children.push_back(VExpr::column_type_to_type_desc(t));
}
break;
case TYPE_STRUCT:
CHECK(type.children.size() >= 1);
out.type = FunctionContext::TYPE_STRUCT;
for (const auto& t : type.children) {
out.children.push_back(VExpr::column_type_to_type_desc(t));
}
break;
case TYPE_STRING:
out.type = FunctionContext::TYPE_STRING;
out.len = type.len;
Expand Down
5 changes: 5 additions & 0 deletions regression-test/data/load_p0/stream_load/struct_malformat.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58", "f8":1.01, "f9":3.1415926, "f10":1.1}
2|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.1}
3|\N
4|"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.1
5|{f1:1, f2:100, f3:100000, f4:'a', f5:"doris", f6:"2023-02-26", f7:"2023-02-26 17:58", f8:1.01, f9:3.1415926, f10:1.1
13 changes: 13 additions & 0 deletions regression-test/data/load_p0/stream_load/struct_normal.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
1|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58", "f8":1.01, "f9":3.1415926, "f10":1.1}
2|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.1}
3|{'f1':1, 'f2':100, 'f3':100000, 'f4':'a', 'f5':"doris", 'f6':"2023-02-26", 'f7':null, 'f8':null, 'f9':null, 'f10':1.1}
4|{f1:1, f2:100, f3:100000, f4:'a', f5:"doris", f6:"2023-02-26", f7:"2023-02-26 17:58", f8:1.01, f9:3.1415926, f10:1.1}
5|{f1: 1, f2: 100, f3: 100000, f4: a, f5: doris, f6: 2023-02-26, f7: "2023-02-26 17:58", f8: 1.01, f9: 3.1415926, f10: 1.1}
6|{"f10":1.1, "f9":3.1415926, "f8":1.01, "f7":"2023-02-26 17:58", "f6":"2023-02-26", "f5":"doris", "f4":'a', "f3":100000, "f2":100, "f1":1}
7|{f10:1.1, f9:3.1415926, f8:1.01, f7:"2023-02-26 17:58", f6:2023-02-26, f5:doris, f4:a, f3:100000, f2:100, f1:1}
8|{f10:1.1, f9:3.1415926, f8:1.01, f7:"2023-02-26 17:58", f6:2023-02-26, f5:doris, f4:null, f3:null, f2:null, f1:1}
9|{"f1":null, "f2":null, "f3":null, "f4":null, "f5":null, "f6":null, "f7":null, "f8":null, "f9":null, "f10":null}
10|{1, 100, 100000, 'a', "doris", "2023-02-26", "2023-02-26 17:58", 1.01, 3.1415926, 1.1}
11|{1, 100, 100000, 'a', "doris", "2023-02-26", null, null, null, 1.1}
12|{null, null, null, null, null, null, null, null, null, null}
13|\N
22 changes: 22 additions & 0 deletions regression-test/data/load_p0/stream_load/test_stream_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,28 @@
7 [1, 2, 3, 4, 5] \N \N \N \N \N \N \N \N \N
8 [1, 2, 3, 4, 5] \N \N \N \N \N [NULL] \N [NULL] \N

-- !all111 --
1 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1}
2 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
3 \N
4 \N
5 \N

-- !all112 --
1 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1}
2 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
3 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
4 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1}
5 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1}
6 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1}
7 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1}
8 {1, NULL, NULL, NULL, 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1}
9 {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
10 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1}
11 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
12 {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
13 \N

-- !sql1 --
-2 -50 1 \N 44
2 -51 1 2 \N
Expand Down
Loading