Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 62 additions & 24 deletions be/src/exec/es/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,32 @@ ESScanReader::ESScanReader(const std::string& target,
std::string filter_path =
_doc_value_mode ? DOCVALUE_SCROLL_SEARCH_FILTER_PATH : SOURCE_SCROLL_SEARCH_FILTER_PATH;

// When shard_id is negative(-1), the request will be sent to ES without shard preference.
int32 shard_id = std::stoi(_shards);
if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
_exactly_once = true;
std::stringstream scratch;
// just send a normal search against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
if (_type.empty()) {
// `terminate_after` and `size` can not be used together in scroll request of ES 8.x
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
if (shard_id < 0) {
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" << filter_path;
} else {
// `terminate_after` and `size` can not be used together in scroll request of ES 8.x
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
}
} else {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER)
<< REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
if (shard_id < 0) {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER) << "&"
<< filter_path;
} else {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER)
<< REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
}
}
_search_url = scratch.str();
} else {
Expand All @@ -95,15 +108,27 @@ ESScanReader::ESScanReader(const std::string& target,
// scroll request for scanning
// add terminate_after for the first scroll to avoid decompress all postings list
if (_type.empty()) {
// `terminate_after` and `size` can not be used together in scroll request of ES 8.x
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path;
if (shard_id < 0) {
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< "scroll=" << _scroll_keep_alive << "&" << filter_path;
} else {
// `terminate_after` and `size` can not be used together in scroll request of ES 8.x
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path;
}
} else {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path << "&terminate_after=" << batch_size_str;
if (shard_id < 0) {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "scroll=" << _scroll_keep_alive << "&" << filter_path
<< "&terminate_after=" << batch_size_str;
} else {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path << "&terminate_after=" << batch_size_str;
}
}
_init_scroll_url = scratch.str();
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path;
Expand All @@ -115,11 +140,13 @@ ESScanReader::~ESScanReader() {}

Status ESScanReader::open() {
_is_first = true;
// we do not enable set_fail_on_error for ES http request to get more detail error messages
bool set_fail_on_error = false;
if (_exactly_once) {
RETURN_IF_ERROR(_network_client.init(_search_url));
RETURN_IF_ERROR(_network_client.init(_search_url, set_fail_on_error));
LOG(INFO) << "search request URL: " << _search_url;
} else {
RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
RETURN_IF_ERROR(_network_client.init(_init_scroll_url, set_fail_on_error));
LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
}
_network_client.set_basic_auth(_user_name, _passwd);
Expand All @@ -132,7 +159,8 @@ Status ESScanReader::open() {
Status status = _network_client.execute_post_request(_query, &_cached_response);
if (!status.ok() || _network_client.get_http_status() != 200) {
std::stringstream ss;
ss << "Failed to connect to ES server, errmsg is: " << status;
ss << "Failed to connect to ES server, errmsg is: " << status
<< ", response: " << _cached_response;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
Expand All @@ -155,7 +183,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
if (_exactly_once) {
return Status::OK();
}
RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
// we do not enable set_fail_on_error for ES http request to get more detail error messages
bool set_fail_on_error = false;
RETURN_IF_ERROR(_network_client.init(_next_scroll_url, set_fail_on_error));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(_http_timeout_ms);
Expand All @@ -168,13 +198,15 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
long status = _network_client.get_http_status();
if (status == 404) {
LOG(WARNING) << "request scroll search failure 404["
<< ", response: " << (response.empty() ? "empty response" : response);
<< ", response: " << (response.empty() ? "empty response" : response)
<< "]";
return Status::InternalError("No search context found for {}", _scroll_id);
}
if (status != 200) {
LOG(WARNING) << "request scroll search failure["
<< "http status" << status
<< ", response: " << (response.empty() ? "empty response" : response);
<< "http status: " << status
<< ", response: " << (response.empty() ? "empty response" : response)
<< "]";
return Status::InternalError("request scroll search failure: {}",
(response.empty() ? "empty response" : response));
}
Expand Down Expand Up @@ -211,7 +243,9 @@ Status ESScanReader::close() {
}

std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH;
RETURN_IF_ERROR(_network_client.init(scratch_target));
// we do not enable set_fail_on_error for ES http request to get more detail error messages
bool set_fail_on_error = false;
RETURN_IF_ERROR(_network_client.init(scratch_target, set_fail_on_error));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_method(DELETE);
_network_client.set_content_type("application/json");
Expand All @@ -222,9 +256,13 @@ Status ESScanReader::close() {
std::string response;
RETURN_IF_ERROR(_network_client.execute_delete_request(
ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response));
if (_network_client.get_http_status() == 200) {
long status = _network_client.get_http_status();
if (status == 200) {
return Status::OK();
} else {
LOG(WARNING) << "es_scan_reader delete scroll context failure["
<< "http status: " << status
<< ", response: " << (response.empty() ? "empty response" : response) << "]";
return Status::InternalError("es_scan_reader delete scroll context failure");
}
}
Expand Down
18 changes: 13 additions & 5 deletions be/src/exec/es/es_scroll_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,19 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
} else {
size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
}
rapidjson::Value sort_node(rapidjson::kArrayType);
// use the scroll-scan mode for scan index documents
rapidjson::Value field("_doc", allocator);
sort_node.PushBack(field, allocator);
es_query_dsl.AddMember("sort", sort_node, allocator);

std::string shard_id;
if (properties.find(ESScanReader::KEY_SHARD) != properties.end()) {
shard_id = properties.at(ESScanReader::KEY_SHARD);
}
// To maintain consistency with the query, when shard_id is negative, do not add sort node in scroll request body.
if (!shard_id.empty() && std::stoi(shard_id) >= 0) {
rapidjson::Value sort_node(rapidjson::kArrayType);
// use the scroll-scan mode for scan index documents
rapidjson::Value field("_doc", allocator);
sort_node.PushBack(field, allocator);
es_query_dsl.AddMember("sort", sort_node, allocator);
}
// number of documents returned
es_query_dsl.AddMember("size", size, allocator);
rapidjson::StringBuffer buffer;
Expand Down
14 changes: 9 additions & 5 deletions be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ HttpClient::~HttpClient() {
}
}

Status HttpClient::init(const std::string& url) {
Status HttpClient::init(const std::string& url, bool set_fail_on_error) {
if (_curl == nullptr) {
_curl = curl_easy_init();
if (_curl == nullptr) {
Expand Down Expand Up @@ -70,10 +70,14 @@ Status HttpClient::init(const std::string& url) {
return Status::InternalError("fail to set CURLOPT_NOSIGNAL");
}
// set fail on error
code = curl_easy_setopt(_curl, CURLOPT_FAILONERROR, 1L);
if (code != CURLE_OK) {
LOG(WARNING) << "fail to set CURLOPT_FAILONERROR, msg=" << _to_errmsg(code);
return Status::InternalError("fail to set CURLOPT_FAILONERROR");
// When this option is set to `1L` (enabled), libcurl will return an error directly
// when encountering HTTP error codes (>= 400), without reading the body of the error response.
if (set_fail_on_error) {
code = curl_easy_setopt(_curl, CURLOPT_FAILONERROR, 1L);
if (code != CURLE_OK) {
LOG(WARNING) << "fail to set CURLOPT_FAILONERROR, msg=" << _to_errmsg(code);
return Status::InternalError("fail to set CURLOPT_FAILONERROR");
}
}
// set redirect
code = curl_easy_setopt(_curl, CURLOPT_FOLLOWLOCATION, 1L);
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class HttpClient {

// this function must call before other function,
// you can call this multiple times to reuse this object
Status init(const std::string& url);
Status init(const std::string& url, bool set_fail_on_error = true);

void set_method(HttpMethod method);

Expand Down
Loading