Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 154 additions & 123 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,159 @@ Status insert_int_value(const rapidjson::Value& col, PrimitiveType type,
return parse_and_insert_data(col);
}

template <typename T>
Status handle_value(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
T& val) {
RETURN_IF_ERROR(get_int_value<T>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<float>(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
float& val) {
RETURN_IF_ERROR(get_float_value<float>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<double>(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, double& val) {
RETURN_IF_ERROR(get_float_value<double>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<std::string>(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, std::string& val) {
RETURN_ERROR_IF_COL_IS_ARRAY(col, sub_type, true);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
val = col.GetString();
}
return Status::OK();
}

template <>
Status handle_value<bool>(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
bool& val) {
if (col.IsBool()) {
val = col.GetBool();
return Status::OK();
}

if (col.IsNumber()) {
val = col.GetInt();
return Status::OK();
}

bool is_nested_str = false;
if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsBool()) {
val = col[0].GetBool();
return Status::OK();
} else if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
}

const rapidjson::Value& str_col = is_nested_str ? col[0] : col;
const std::string& str_val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
val = StringParser::string_to_bool(str_val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, str_col, sub_type);
return Status::OK();
}

template <typename T>
Status process_single_column(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array) {
T val;
RETURN_IF_ERROR(handle_value<T>(col, sub_type, pure_doc_value, val));
array.push_back(val);
return Status::OK();
}

template <typename T>
Status process_column_array(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array) {
for (const auto& sub_col : col.GetArray()) {
RETURN_IF_ERROR(process_single_column<T>(sub_col, sub_type, pure_doc_value, array));
}
return Status::OK();
}

template <typename T>
Status process_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array) {
if (!col.IsArray()) {
return process_single_column<T>(col, sub_type, pure_doc_value, array);
} else {
return process_column_array<T>(col, sub_type, pure_doc_value, array);
}
}

template <typename DateType, typename RT>
Status process_date_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array, const cctz::time_zone& time_zone) {
if (!col.IsArray()) {
RT data;
RETURN_IF_ERROR(
(get_date_int<DateType, RT>(col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(data);
} else {
for (const auto& sub_col : col.GetArray()) {
RT data;
RETURN_IF_ERROR((get_date_int<DateType, RT>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
}
}
return Status::OK();
}

Status ScrollParser::parse_column(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array,
const cctz::time_zone& time_zone) {
switch (sub_type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING:
return process_column<std::string>(col, sub_type, pure_doc_value, array);
case TYPE_TINYINT:
return process_column<int8_t>(col, sub_type, pure_doc_value, array);
case TYPE_SMALLINT:
return process_column<int16_t>(col, sub_type, pure_doc_value, array);
case TYPE_INT:
return process_column<int32>(col, sub_type, pure_doc_value, array);
case TYPE_BIGINT:
return process_column<int64_t>(col, sub_type, pure_doc_value, array);
case TYPE_LARGEINT:
return process_column<__int128>(col, sub_type, pure_doc_value, array);
case TYPE_FLOAT:
return process_column<float>(col, sub_type, pure_doc_value, array);
case TYPE_DOUBLE:
return process_column<double>(col, sub_type, pure_doc_value, array);
case TYPE_BOOLEAN:
return process_column<bool>(col, sub_type, pure_doc_value, array);
// date/datetime v2 is the default type for catalog table,
// see https://github.com/apache/doris/pull/16304
// No need to support date and datetime types.
case TYPE_DATEV2: {
return process_date_column<vectorized::DateV2Value<vectorized::DateV2ValueType>, uint32_t>(
col, sub_type, pure_doc_value, array, time_zone);
}
case TYPE_DATETIMEV2: {
return process_date_column<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(col, sub_type, pure_doc_value, array, time_zone);
}
default:
LOG(ERROR) << "Do not support Array type: " << sub_type;
return Status::InternalError("Unsupported type");
}
}

ScrollParser::ScrollParser(bool doc_value_mode) : _size(0), _line_index(0) {}

ScrollParser::~ScrollParser() = default;
Expand Down Expand Up @@ -684,129 +837,7 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
case TYPE_ARRAY: {
vectorized::Array array;
const auto& sub_type = tuple_desc->slots()[i]->type().children[0].type;
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, false);
for (const auto& sub_col : col.GetArray()) {
switch (sub_type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
std::string val;
RETURN_ERROR_IF_COL_IS_ARRAY(sub_col, sub_type, true);
if (!sub_col.IsString()) {
val = json_value_to_string(sub_col);
} else {
val = sub_col.GetString();
}
array.push_back(val);
break;
}
case TYPE_TINYINT: {
int8_t val;
RETURN_IF_ERROR(get_int_value<int8_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_SMALLINT: {
int16_t val;
RETURN_IF_ERROR(
get_int_value<int16_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_INT: {
int32 val;
RETURN_IF_ERROR(get_int_value<int32>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_BIGINT: {
int64_t val;
RETURN_IF_ERROR(
get_int_value<int64_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_LARGEINT: {
__int128 val;
RETURN_IF_ERROR(
get_int_value<__int128>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_FLOAT: {
float val {};
RETURN_IF_ERROR(
get_float_value<float>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_DOUBLE: {
double val {};
RETURN_IF_ERROR(
get_float_value<double>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_BOOLEAN: {
if (sub_col.IsBool()) {
array.push_back(sub_col.GetBool());
break;
}

if (sub_col.IsNumber()) {
array.push_back(sub_col.GetInt());
break;
}

bool is_nested_str = false;
if (pure_doc_value && sub_col.IsArray() && !sub_col.Empty() &&
sub_col[0].IsBool()) {
array.push_back(sub_col[0].GetBool());
break;
} else if (pure_doc_value && sub_col.IsArray() && !sub_col.Empty() &&
sub_col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && sub_col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
}

const rapidjson::Value& str_col = is_nested_str ? sub_col[0] : sub_col;

const std::string& val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
bool b = StringParser::string_to_bool(val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, str_col, type);
array.push_back(b);
break;
}
// date/datetime v2 is the default type for catalog table,
// see https://github.com/apache/doris/pull/16304
// No need to support date and datetime types.
case TYPE_DATEV2: {
uint32_t data;
RETURN_IF_ERROR(
(get_date_int<vectorized::DateV2Value<vectorized::DateV2ValueType>,
uint32_t>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
break;
}
case TYPE_DATETIMEV2: {
uint64_t data;
RETURN_IF_ERROR(
(get_date_int<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
break;
}
default: {
LOG(ERROR) << "Do not support Array type: " << sub_type;
break;
}
}
}
RETURN_IF_ERROR(parse_column(col, sub_type, pure_doc_value, array, time_zone));
col_ptr->insert(array);
break;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/es/es_scroll_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class ScrollParser {
int get_size() const;

private:
Status parse_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array, const cctz::time_zone& time_zone);
std::string _scroll_id;
int _size;
rapidjson::SizeType _line_index;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{"name": "Andy", "sports": "soccer"}
{"name": "Betty", "sports": "pingpong ball"}
{"name": "Cindy", "sports": "武术"}
{"name": "David", "sports": ["volleyball"]}
{"name": "Emily", "sports": ["baseball", "golf", "hockey"]}
{"name": "Frank", "sports": ["rugby", "cricket", "boxing"]}
{"name": "Grace", "sports": ["table tennis", "badminton", "athletics"]}
{"name": "Henry", "sports": ["archery", "fencing", "weightlifting"]}
{"name": "Ivy", "sports": ["judo", "karate", "taekwondo"]}
{"name": "Jack", "sports": ["wrestling", "gymnastics", "surfing"]}
{"name": "Andy", "sports": "soccer", "scores": 100}
{"name": "Betty", "sports": "pingpong ball", "scores": 90}
{"name": "Cindy", "sports": "武术", "scores": 89}
{"name": "David", "sports": ["volleyball"], "scores": [77]}
{"name": "Emily", "sports": ["baseball", "golf", "hockey"], "scores": [56, 78, 99]}
{"name": "Frank", "sports": ["rugby", "cricket", "boxing"], "scores": [45, 67, 88]}
{"name": "Grace", "sports": ["table tennis", "badminton", "athletics"], "scores": [34, 56, 78]}
{"name": "Henry", "sports": ["archery", "fencing", "weightlifting"], "scores": [23, 45, 67]}
{"name": "Ivy", "sports": ["judo", "karate", "taekwondo"], "scores": [12, 34, 56]}
{"name": "Jack", "sports": ["wrestling", "gymnastics", "surfing"], "scores": [1, 23, 45]}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"_meta": {
"doris":{
"array_fields":[
"sports"
"sports",
"scores"
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"doc": {
"properties": {
"name": { "type": "keyword" },
"sports": { "type": "keyword", "doc_values": false}
"sports": { "type": "keyword", "doc_values": false},
"scores": { "type": "integer", "doc_values": false}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"mappings": {
"properties": {
"name": { "type": "keyword" },
"sports": { "type": "keyword", "doc_values": false}
"sports": { "type": "keyword", "doc_values": false},
"scores": { "type": "integer", "doc_values": false}
}
}
}
Loading