Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 69 additions & 32 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/es/es_scroll_parser.h"

#include <cctz/time_zone.h>
#include <gutil/strings/substitute.h>

#include <boost/algorithm/string.hpp>
Expand Down Expand Up @@ -138,14 +139,6 @@ static const std::string INVALID_NULL_VALUE =
return Status::RuntimeError(ss.str()); \
} while (false)

#define PARSE_DATE(dt_val, col, type, is_date_str) \
if ((is_date_str && \
!dt_val.from_date_str(static_cast<const std::string>(col.GetString()).c_str(), \
col.GetStringLength())) || \
(!is_date_str && !dt_val.from_unixtime(col.GetInt64() / 1000, "+08:00"))) { \
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type); \
}

template <typename T>
static Status get_int_value(const rapidjson::Value& col, PrimitiveType type, void* slot,
bool pure_doc_value) {
Expand All @@ -154,7 +147,7 @@ static Status get_int_value(const rapidjson::Value& col, PrimitiveType type, voi
return Status::OK();
}

if (pure_doc_value && col.IsArray()) {
if (pure_doc_value && col.IsArray() && !col.Empty()) {
RETURN_ERROR_IF_COL_IS_NOT_NUMBER(col[0], type);
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
return Status::OK();
Expand Down Expand Up @@ -184,8 +177,54 @@ static Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type
RT* slot) {
constexpr bool is_datetime_v1 = std::is_same_v<T, vectorized::VecDateTimeValue>;
T dt_val;
PARSE_DATE(dt_val, col, type, is_date_str)
if (is_date_str) {
const std::string str_date = col.GetString();
int str_length = col.GetStringLength();
bool success = false;
// YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DDTHH:MM:SS+08:00 or 2022-08-08T12:10:10.000Z
if (str_length > 19) {
std::chrono::system_clock::time_point tp;
const bool ok =
cctz::parse("%Y-%m-%dT%H:%M:%E*S%Ez", str_date, cctz::utc_time_zone(), &tp);
if (ok) {
success = dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp),
cctz::local_time_zone());
}
} else if (str_length == 19) {
// YYYY-MM-DDTHH:MM:SS
if (*(str_date.c_str() + 10) == 'T') {
std::chrono::system_clock::time_point tp;
const bool ok =
cctz::parse("%Y-%m-%dT%H:%M:%S", str_date, cctz::utc_time_zone(), &tp);
if (ok) {
success = dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp),
cctz::local_time_zone());
}
} else {
// YYYY-MM-DD HH:MM:SS
success = dt_val.from_date_str(str_date.c_str(), str_length);
}

} else if (str_length == 13) {
// string long like "1677895728000"
int64_t time_long = std::atol(str_date.c_str());
if (time_long > 0) {
success = dt_val.from_unixtime(time_long / 1000, cctz::local_time_zone());
}
} else {
// YYYY-MM-DD or others
success = dt_val.from_date_str(str_date.c_str(), str_length);
}

if (!success) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}

} else {
if (!dt_val.from_unixtime(col.GetInt64() / 1000, cctz::local_time_zone())) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
}
if constexpr (is_datetime_v1) {
if (type == TYPE_DATE) {
dt_val.cast_to_date();
Expand All @@ -207,7 +246,7 @@ static Status get_date_int(const rapidjson::Value& col, PrimitiveType type, bool
// processing date type field, if a number is encountered, Doris On ES will force it to be processed according to ms
// Doris On ES needs to be consistent with ES, so just divided by 1000 because the unit for from_unixtime is seconds
return get_date_value_int<T, RT>(col, type, false, slot);
} else if (col.IsArray() && pure_doc_value) {
} else if (col.IsArray() && pure_doc_value && !col.Empty()) {
// this would happened just only when `enable_docvalue_scan = true`
// ES add default format for all field after ES 6.4, if we not provided format for `date` field ES would impose
// a standard date-format for date field as `2020-06-16T00:00:00.000Z`
Expand Down Expand Up @@ -243,7 +282,7 @@ static Status get_float_value(const rapidjson::Value& col, PrimitiveType type, v
return Status::OK();
}

if (pure_doc_value && col.IsArray()) {
if (pure_doc_value && col.IsArray() && !col.Empty()) {
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) == 4 ? col[0].GetFloat() : col[0].GetDouble());
return Status::OK();
}
Expand Down Expand Up @@ -271,7 +310,7 @@ static Status insert_float_value(const rapidjson::Value& col, PrimitiveType type
return Status::OK();
}

if (pure_doc_value && col.IsArray() && nullable) {
if (pure_doc_value && col.IsArray() && !col.Empty() && nullable) {
T value = (T)(sizeof(T) == 4 ? col[0].GetFloat() : col[0].GetDouble());
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
return Status::OK();
Expand Down Expand Up @@ -300,7 +339,7 @@ static Status insert_int_value(const rapidjson::Value& col, PrimitiveType type,
return Status::OK();
}

if (pure_doc_value && col.IsArray()) {
if (pure_doc_value && col.IsArray() && !col.Empty()) {
RETURN_ERROR_IF_COL_IS_NOT_NUMBER(col[0], type);
T value = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
Expand Down Expand Up @@ -438,7 +477,9 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
// this may be a tricky, but we can workaround this issue
std::string val;
if (pure_doc_value) {
if (!col[0].IsString()) {
if (col.Empty()) {
break;
} else if (!col[0].IsString()) {
val = json_value_to_string(col[0]);
} else {
val = col[0].GetString();
Expand Down Expand Up @@ -507,11 +548,11 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
}

bool is_nested_str = false;
if (pure_doc_value && col.IsArray() && col[0].IsBool()) {
if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsBool()) {
int8_t val = col[0].GetBool();
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&val)), 0);
break;
} else if (pure_doc_value && col.IsArray() && col[0].IsString()) {
} else if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
Expand All @@ -537,7 +578,9 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
} else {
std::string val;
if (pure_doc_value) {
if (!col[0].IsString()) {
if (col.Empty()) {
break;
} else if (!col[0].IsString()) {
val = json_value_to_string(col[0]);
} else {
val = col[0].GetString();
Expand Down Expand Up @@ -580,19 +623,11 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
case TYPE_VARCHAR:
case TYPE_STRING: {
std::string val;
if (pure_doc_value) {
if (!sub_col[0].IsString()) {
val = json_value_to_string(sub_col[0]);
} else {
val = sub_col[0].GetString();
}
RETURN_ERROR_IF_COL_IS_ARRAY(sub_col, sub_type);
if (!sub_col.IsString()) {
val = json_value_to_string(sub_col);
} else {
RETURN_ERROR_IF_COL_IS_ARRAY(sub_col, type);
if (!sub_col.IsString()) {
val = json_value_to_string(sub_col);
} else {
val = sub_col.GetString();
}
val = sub_col.GetString();
}
array.push_back(val);
break;
Expand Down Expand Up @@ -656,10 +691,12 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
}

bool is_nested_str = false;
if (pure_doc_value && sub_col.IsArray() && sub_col[0].IsBool()) {
if (pure_doc_value && sub_col.IsArray() && !sub_col.Empty() &&
sub_col[0].IsBool()) {
array.push_back(sub_col[0].GetBool());
break;
} else if (pure_doc_value && sub_col.IsArray() && sub_col[0].IsString()) {
} else if (pure_doc_value && sub_col.IsArray() && !sub_col.Empty() &&
sub_col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && sub_col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ AttachTask::~AttachTask() {

SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
SwitchBthreadLocal::switch_to_bthread_local();
SwitchBthreadLocal::switch_to_bthread_local();
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"test5": "2022-08-08 12:10:10",
"test6": 1659931810000,
"test7": 1659931810000,
"test8": "2022-08-08T12:10:10Z",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"test5": "2022-08-09 12:10:10",
"test6": 1660018210000,
"test7": "2022-08-09 12:10:10",
"test8": 1660018210000,
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"test3": 5.0,
"test4": "2022-08-08",
"test5": "2022-08-10 12:10:10",
"test6": 1660018210000,
"test7": "2022-08-10 12:10:10",
"test6": 1660104610000,
"test7": 1660104610000,
"test8": "2022-08-10T12:10:10",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"test1": "string4",
"test2": "text3_4*5",
"test3": 6.0,
"test4": "2022-08-08",
"test5": "2022-08-11 12:10:10",
"test6": 1660191010000,
"test7": "2022-08-11 12:10:10",
"test8": "2022-08-11T12:10:10+09:00",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
"c_integer": [32768, 32769, -32769, -32770],
"c_long": [-1, 0, 1, 2],
"c_unsigned_long": [0, 1, 2, 3],
"c_float": [1.0, 1.1, 1.2, 1.3],
"c_half_float": [1, 2, 3, 4],
"c_double": [1, 2, 3, 4],
"c_scaled_float": [1, 2, 3, 4],
"c_date": ["2020-01-01", "2020-01-02"],
"c_datetime": ["2020-01-01 12:00:00", "2020-01-02 13:01:01"],
"c_keyword": ["a", "b", "c"],
"c_text": ["d", "e", "f"],
"c_ip": ["192.168.0.1", "127.0.0.1"],
"c_person": [
{"name": "Andy", "age": 18},
{"name": "Tim", "age": 28}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
# create index test1
# shellcheck disable=SC2154
curl "http://${ES_6_HOST}:9200/test1" -H "Content-Type:application/json" -X PUT -d "@/mnt/scripts/index/es6_test1.json"
# create index test2
# create index test2_20220808
curl "http://${ES_6_HOST}:9200/test2_20220808" -H "Content-Type:application/json" -X PUT -d '@/mnt/scripts/index/es6_test2.json'
# put data
# put data for test1
curl "http://${ES_6_HOST}:9200/test1/doc/1" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data1_es6.json'
curl "http://${ES_6_HOST}:9200/test1/doc/2" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data2_es6.json'
curl "http://${ES_6_HOST}:9200/test1/doc/3" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data3_es6.json'
# put data for test2_20220808
curl "http://${ES_6_HOST}:9200/test2_20220808/doc/1" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data1_es6.json'
curl "http://${ES_6_HOST}:9200/test2_20220808/doc/2" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data2_es6.json'
curl "http://${ES_6_HOST}:9200/test2_20220808/doc/3" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data3_es6.json'
Expand All @@ -36,31 +37,39 @@ curl "http://${ES_6_HOST}:9200/test2_20220808/doc/_mapping" -H "Content-Type:app
# es7
# create index test1
curl "http://${ES_7_HOST}:9200/test1" -H "Content-Type:application/json" -X PUT -d "@/mnt/scripts/index/es7_test1.json"
# create index test2
# create index test2_20220808
curl "http://${ES_7_HOST}:9200/test2_20220808" -H "Content-Type:application/json" -X PUT -d '@/mnt/scripts/index/es7_test2.json'
# put data
# put data for tese1
curl "http://${ES_7_HOST}:9200/test1/_doc/1" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data1.json'
curl "http://${ES_7_HOST}:9200/test1/_doc/2" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data2.json'
curl "http://${ES_7_HOST}:9200/test1/_doc/3" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data3.json'
curl "http://${ES_7_HOST}:9200/test1/_doc/4" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data4.json'
# put data for test2_20220808
curl "http://${ES_7_HOST}:9200/test2_20220808/_doc/1" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data1.json'
curl "http://${ES_7_HOST}:9200/test2_20220808/_doc/2" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data2.json'
curl "http://${ES_7_HOST}:9200/test2_20220808/_doc/3" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data3.json'
curl "http://${ES_7_HOST}:9200/test2_20220808/_doc/4" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data4.json'

# put _meta for array
curl "http://${ES_7_HOST}:9200/test1/_mapping" -H "Content-Type:application/json" -X PUT -d "@/mnt/scripts/index/array_meta.json"
curl "http://${ES_7_HOST}:9200/test2_20220808/_mapping" -H "Content-Type:application/json" -X PUT -d "@/mnt/scripts/index/array_meta.json"

# es8
# create index test1
curl "http://${ES_8_HOST}:9200/test1" -H "Content-Type:application/json" -X PUT -d "@/mnt/scripts/index/es7_test1.json"
# create index test2
# create index test2_20220808
curl "http://${ES_8_HOST}:9200/test2_20220808" -H "Content-Type:application/json" -X PUT -d '@/mnt/scripts/index/es7_test2.json'
# put data
# put data for tese1
curl "http://${ES_8_HOST}:9200/test1/_doc/1" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data1.json'
curl "http://${ES_8_HOST}:9200/test1/_doc/2" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data2.json'
curl "http://${ES_8_HOST}:9200/test1/_doc/3" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data3.json'
curl "http://${ES_8_HOST}:9200/test1/_doc/4" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data4.json'
# put data for test2_20220808
curl "http://${ES_8_HOST}:9200/test2_20220808/_doc/1" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data1.json'
curl "http://${ES_8_HOST}:9200/test2_20220808/_doc/2" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data2.json'
curl "http://${ES_8_HOST}:9200/test2_20220808/_doc/3" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data3.json'
curl "http://${ES_8_HOST}:9200/test2_20220808/_doc/4" -H "Content-Type:application/json" -X POST -d '@/mnt/scripts/data/data4.json'

# put _meta for array
curl "http://${ES_8_HOST}:9200/test1/_mapping" -H "Content-Type:application/json" -X PUT -d "@/mnt/scripts/index/array_meta.json"
curl "http://${ES_8_HOST}:9200/test2_20220808/_mapping" -H "Content-Type:application/json" -X PUT -d "@/mnt/scripts/index/array_meta.json"
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss || epoch_millis"
},
"test8": {
"type": "date"
},
"c_bool": {
"type": "boolean"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss || epoch_millis"
},
"test8": {
"type": "date"
},
"c_bool": {
"type": "boolean"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ public static void valid(Map<String, String> properties, boolean isAlter) throws
}
}

public static void fillUrlsWithSchema(String[] urls, boolean isSslEnabled) {
for (int i = 0; i < urls.length; i++) {
String seed = urls[i].trim();
if (!seed.startsWith("http://") && !seed.startsWith("https://")) {
urls[i] = (isSslEnabled ? "https://" : "http://") + seed;
}
}
}

private Map<String, String> processCompatibleProperties(Map<String, String> props) {
// Compatible with ES catalog properties
Map<String, String> properties = Maps.newHashMap(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ public List<String> needCompatDateFields() {

private void validate(Map<String, String> properties) throws DdlException {
EsResource.valid(properties, false);
hosts = properties.get(EsResource.HOSTS).trim();
seeds = hosts.split(",");
if (properties.containsKey(EsResource.USER)) {
userName = properties.get(EsResource.USER).trim();
}
Expand Down Expand Up @@ -198,6 +196,12 @@ private void validate(Map<String, String> properties) throws DdlException {
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
}
}

hosts = properties.get(EsResource.HOSTS).trim();
seeds = hosts.split(",");
// parse httpSslEnabled before use it here.
EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);

tableContext.put("hosts", hosts);
tableContext.put("userName", userName);
tableContext.put("passwd", passwd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ private Map<String, String> processCompatibleProperties(Map<String, String> prop

public String[] getNodes() {
String hosts = catalogProperty.getOrDefault(EsResource.HOSTS, "");
return hosts.trim().split(",");
String sslEnabled =
catalogProperty.getOrDefault(EsResource.HTTP_SSL_ENABLED, EsResource.HTTP_SSL_ENABLED_DEFAULT_VALUE);
String[] hostUrls = hosts.trim().split(",");
EsResource.fillUrlsWithSchema(hostUrls, Boolean.parseBoolean(sslEnabled));
return hostUrls;
}

public String getUsername() {
Expand Down
Loading