From d1627521931436a86ad2b1720ba93c04e647aa1b Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Tue, 19 Mar 2019 15:15:41 +0800 Subject: [PATCH 01/17] Change HttpClient to support http post --- be/src/http/http_client.cpp | 10 ++++++++++ be/test/http/http_client_test.cpp | 12 ++++++++++++ 2 files changed, 22 insertions(+) diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index 88c4374fe6f618..052543e8c43f03 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -24,7 +24,12 @@ HttpClient::HttpClient() { HttpClient::~HttpClient() { if (_curl != nullptr) { + curl_slist header_list = nullptr; + curl_easy_getinfo(_curl, CURLOPT_HTTPHEADER, &header_list); curl_easy_cleanup(_curl); + if(header_list != nullptr) { + curl_slist_free_all(header_list); + } _curl = nullptr; } if(_header_list != nullptr) { @@ -40,6 +45,11 @@ Status HttpClient::init(const std::string& url) { return Status("fail to initalize curl"); } } else { + curl_slist header_list = nullptr; + curl_easy_getinfo(_curl, CURLOPT_HTTPHEADER, &header_list); + if(slist != nullptr) { + curl_slist_free_all(header_list); + } curl_easy_reset(_curl); } diff --git a/be/test/http/http_client_test.cpp b/be/test/http/http_client_test.cpp index 343c60614466a8..9e84e95f1adf01 100644 --- a/be/test/http/http_client_test.cpp +++ b/be/test/http/http_client_test.cpp @@ -151,6 +151,18 @@ TEST_F(HttpClientTest, post_normal) { ASSERT_STREQ(response.c_str(), request_body.c_str()); } +TEST_F(HttpClientTest, post_normal) { + HttpClient client; + auto st = client.init("http://127.0.0.1:29386/simple_post"); + ASSERT_TRUE(st.ok()); + client.set_method(POST); + std::string response; + std::string request_body = "simple post body query"; + st = client.execute_post_request(request_body, &response); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(response.length(), request_body.length()); +} + } int main(int argc, char* argv[]) { From c0f32446ce256f02bcdf32455e8ed86aab56c150 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Tue, 19 Mar 2019 15:41:39 +0800 Subject: [PATCH 02/17] Change HttpClient to support http post --- be/src/http/http_client.cpp | 6 +----- be/src/http/http_client.h | 9 +++++++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index 052543e8c43f03..f3bb12128eded4 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -24,8 +24,6 @@ HttpClient::HttpClient() { HttpClient::~HttpClient() { if (_curl != nullptr) { - curl_slist header_list = nullptr; - curl_easy_getinfo(_curl, CURLOPT_HTTPHEADER, &header_list); curl_easy_cleanup(_curl); if(header_list != nullptr) { curl_slist_free_all(header_list); @@ -45,9 +43,7 @@ Status HttpClient::init(const std::string& url) { return Status("fail to initalize curl"); } } else { - curl_slist header_list = nullptr; - curl_easy_getinfo(_curl, CURLOPT_HTTPHEADER, &header_list); - if(slist != nullptr) { + if(header_list != nullptr) { curl_slist_free_all(header_list); } curl_easy_reset(_curl); diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index 83a27b8d63646f..86db542476883b 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -57,8 +57,13 @@ class HttpClient { // content_type such as "application/json" void set_content_type(const std::string content_type) { std::string scratch_str = "Content-Type: " + content_type; +<<<<<<< HEAD _header_list = curl_slist_append(_header_list, scratch_str.c_str()); curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); +======= + header_list = curl_slist_append(NULL, scratch_str.c_str()); + curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, header_list); +>>>>>>> Change HttpClient to support http post } // you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up @@ -132,7 +137,11 @@ class HttpClient { using HttpCallback = std::function; const HttpCallback* _callback = nullptr; char _error_buf[CURL_ERROR_SIZE]; +<<<<<<< HEAD curl_slist *_header_list = nullptr; +======= + curl_slist *header_list; +>>>>>>> Change HttpClient to support http post }; } From b91e63a317980151cc6f84f684bb39ee5f8acab3 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Tue, 19 Mar 2019 17:09:23 +0800 Subject: [PATCH 03/17] Change HttpClient to support http post --- be/src/http/http_client.cpp | 12 ++++++------ be/src/http/http_client.h | 13 +++++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index f3bb12128eded4..2335f7ad0b0667 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -25,14 +25,14 @@ HttpClient::HttpClient() { HttpClient::~HttpClient() { if (_curl != nullptr) { curl_easy_cleanup(_curl); - if(header_list != nullptr) { - curl_slist_free_all(header_list); - } _curl = nullptr; } if(_header_list != nullptr) { curl_slist_free_all(_header_list); +<<<<<<< HEAD _header_list = nullptr; +======= +>>>>>>> Change HttpClient to support http post } } @@ -43,15 +43,15 @@ Status HttpClient::init(const std::string& url) { return Status("fail to initalize curl"); } } else { - if(header_list != nullptr) { - curl_slist_free_all(header_list); - } curl_easy_reset(_curl); } if(_header_list != nullptr) { curl_slist_free_all(_header_list); +<<<<<<< HEAD _header_list = nullptr; +======= +>>>>>>> Change HttpClient to support http post } // set error_buf _error_buf[0] = 0; diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index 86db542476883b..669d1f0220cec8 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -57,6 +57,7 @@ class HttpClient { // content_type such as "application/json" void set_content_type(const std::string content_type) { std::string scratch_str = "Content-Type: " + content_type; +<<<<<<< HEAD <<<<<<< HEAD _header_list = curl_slist_append(_header_list, scratch_str.c_str()); curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); @@ -68,6 +69,14 @@ class HttpClient { // you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up void set_post_body(const std::string& post_body) { +======= + _header_list = curl_slist_append(NULL, scratch_str.c_str()); + curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); + } + + void set_post_body(const std::string& post_body) { + curl_easy_setopt(_curl, CURLOPT_POSTFIELDS, post_body.c_str()); +>>>>>>> Change HttpClient to support http post curl_easy_setopt(_curl, CURLOPT_POSTFIELDSIZE, (long)post_body.length()); curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); } @@ -137,11 +146,15 @@ class HttpClient { using HttpCallback = std::function; const HttpCallback* _callback = nullptr; char _error_buf[CURL_ERROR_SIZE]; +<<<<<<< HEAD <<<<<<< HEAD curl_slist *_header_list = nullptr; ======= curl_slist *header_list; >>>>>>> Change HttpClient to support http post +======= + curl_slist *_header_list = nullptr; +>>>>>>> Change HttpClient to support http post }; } From 4a36991ee6768fca8b780918df6016ff110e9698 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Tue, 19 Mar 2019 20:04:49 +0800 Subject: [PATCH 04/17] Add http post feature for HttpClient --- be/src/http/http_client.cpp | 4 ++++ be/src/http/http_client.h | 12 ++++++++++++ be/test/http/http_client_test.cpp | 5 +++++ 3 files changed, 21 insertions(+) diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index 2335f7ad0b0667..bacb051ae77158 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -29,10 +29,14 @@ HttpClient::~HttpClient() { } if(_header_list != nullptr) { curl_slist_free_all(_header_list); +<<<<<<< HEAD <<<<<<< HEAD _header_list = nullptr; ======= >>>>>>> Change HttpClient to support http post +======= + _header_list = nullptr; +>>>>>>> Add http post feature for HttpClient } } diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index 669d1f0220cec8..899d3e0a4497ff 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -54,6 +54,7 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_PASSWORD, passwd.c_str()); } +<<<<<<< HEAD // content_type such as "application/json" void set_content_type(const std::string content_type) { std::string scratch_str = "Content-Type: " + content_type; @@ -70,13 +71,24 @@ class HttpClient { // you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up void set_post_body(const std::string& post_body) { ======= +======= + // note: set_content_type would reset the http headers + void set_content_type(const std::string content_type) { + std::string scratch_str = "Content-Type: " + content_type; + if (_header_list != nullptr) { + curl_slist_free_all(_header_list); + } +>>>>>>> Add http post feature for HttpClient _header_list = curl_slist_append(NULL, scratch_str.c_str()); curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); } void set_post_body(const std::string& post_body) { +<<<<<<< HEAD curl_easy_setopt(_curl, CURLOPT_POSTFIELDS, post_body.c_str()); >>>>>>> Change HttpClient to support http post +======= +>>>>>>> Add http post feature for HttpClient curl_easy_setopt(_curl, CURLOPT_POSTFIELDSIZE, (long)post_body.length()); curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); } diff --git a/be/test/http/http_client_test.cpp b/be/test/http/http_client_test.cpp index 9e84e95f1adf01..243a18f4cbcaed 100644 --- a/be/test/http/http_client_test.cpp +++ b/be/test/http/http_client_test.cpp @@ -135,6 +135,7 @@ TEST_F(HttpClientTest, get_failed) { std::string response; st = client.execute(&response); ASSERT_FALSE(!st.ok()); +<<<<<<< HEAD } TEST_F(HttpClientTest, post_normal) { @@ -149,6 +150,8 @@ TEST_F(HttpClientTest, post_normal) { ASSERT_TRUE(st.ok()); ASSERT_EQ(response.length(), request_body.length()); ASSERT_STREQ(response.c_str(), request_body.c_str()); +======= +>>>>>>> Add http post feature for HttpClient } TEST_F(HttpClientTest, post_normal) { @@ -156,11 +159,13 @@ TEST_F(HttpClientTest, post_normal) { auto st = client.init("http://127.0.0.1:29386/simple_post"); ASSERT_TRUE(st.ok()); client.set_method(POST); + client.set_basic_auth("test1", ""); std::string response; std::string request_body = "simple post body query"; st = client.execute_post_request(request_body, &response); ASSERT_TRUE(st.ok()); ASSERT_EQ(response.length(), request_body.length()); + ASSERT_STREQ(response.c_str(), request_body.c_str()) } } From ba50741592a639a38346d9ac4e03d72243eee92d Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Tue, 19 Mar 2019 20:06:20 +0800 Subject: [PATCH 05/17] Add http post feature for HttpClient --- be/src/http/http_client.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index bacb051ae77158..e8737cc696f619 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -52,10 +52,14 @@ Status HttpClient::init(const std::string& url) { if(_header_list != nullptr) { curl_slist_free_all(_header_list); +<<<<<<< HEAD <<<<<<< HEAD _header_list = nullptr; ======= >>>>>>> Change HttpClient to support http post +======= + _header_list = nullptr; +>>>>>>> Add http post feature for HttpClient } // set error_buf _error_buf[0] = 0; From 01bd40de5b41a728570cb7de704be671cc8376b1 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Tue, 19 Mar 2019 20:20:44 +0800 Subject: [PATCH 06/17] Add http post feature for HttpClient --- be/src/http/http_client.cpp | 15 +-------------- be/src/http/http_client.h | 10 +++++++++- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index e8737cc696f619..2047cad466dc16 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -29,14 +29,8 @@ HttpClient::~HttpClient() { } if(_header_list != nullptr) { curl_slist_free_all(_header_list); -<<<<<<< HEAD -<<<<<<< HEAD _header_list = nullptr; -======= ->>>>>>> Change HttpClient to support http post -======= - _header_list = nullptr; ->>>>>>> Add http post feature for HttpClient + } } @@ -52,14 +46,7 @@ Status HttpClient::init(const std::string& url) { if(_header_list != nullptr) { curl_slist_free_all(_header_list); -<<<<<<< HEAD -<<<<<<< HEAD - _header_list = nullptr; -======= ->>>>>>> Change HttpClient to support http post -======= _header_list = nullptr; ->>>>>>> Add http post feature for HttpClient } // set error_buf _error_buf[0] = 0; diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index 899d3e0a4497ff..b7ce12feb8a7d1 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -54,6 +54,7 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_PASSWORD, passwd.c_str()); } +<<<<<<< HEAD <<<<<<< HEAD // content_type such as "application/json" void set_content_type(const std::string content_type) { @@ -80,9 +81,16 @@ class HttpClient { } >>>>>>> Add http post feature for HttpClient _header_list = curl_slist_append(NULL, scratch_str.c_str()); +======= + // content_type such as "application/json" + void set_content_type(const std::string content_type) { + std::string scratch_str = "Content-Type: " + content_type; + _header_list = curl_slist_append(_header_list, scratch_str.c_str()); +>>>>>>> Add http post feature for HttpClient curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); } - + + // you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up void set_post_body(const std::string& post_body) { <<<<<<< HEAD curl_easy_setopt(_curl, CURLOPT_POSTFIELDS, post_body.c_str()); From 8cd22e71d4f7c66cb0d3a0a215faf6b44fe04d65 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Tue, 19 Mar 2019 20:34:15 +0800 Subject: [PATCH 07/17] Add http post feature for HttpClient --- be/test/http/http_client_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/test/http/http_client_test.cpp b/be/test/http/http_client_test.cpp index 243a18f4cbcaed..67b3035d9e2f2f 100644 --- a/be/test/http/http_client_test.cpp +++ b/be/test/http/http_client_test.cpp @@ -165,7 +165,7 @@ TEST_F(HttpClientTest, post_normal) { st = client.execute_post_request(request_body, &response); ASSERT_TRUE(st.ok()); ASSERT_EQ(response.length(), request_body.length()); - ASSERT_STREQ(response.c_str(), request_body.c_str()) + ASSERT_STREQ(response.c_str(), request_body.c_str()); } } From a8e54e1acfac84046b00908b2198f7aefdb017e0 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 11:28:57 +0800 Subject: [PATCH 08/17] Add Elasticsearch scan reader --- be/src/http/http_client.cpp | 10 +- be/src/http/http_client.h | 39 +--- be/src/util/CMakeLists.txt | 3 + be/src/util/es_scan_reader.cpp | 208 +++++++++++++++++++++ be/src/util/es_scan_reader.h | 72 ++++++++ be/src/util/es_search_builder.cpp | 63 +++++++ be/src/util/es_search_builder.h | 45 +++++ be/test/util/CMakeLists.txt | 1 + be/test/util/es_scan_reader_test.cpp | 260 +++++++++++++++++++++++++++ 9 files changed, 664 insertions(+), 37 deletions(-) create mode 100644 be/src/util/es_scan_reader.cpp create mode 100644 be/src/util/es_scan_reader.h create mode 100644 be/src/util/es_search_builder.cpp create mode 100644 be/src/util/es_search_builder.h create mode 100644 be/test/util/es_scan_reader_test.cpp diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index 2047cad466dc16..207f7be2bc5e99 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -146,12 +146,18 @@ size_t HttpClient::on_response_data(const void* data, size_t length) { // return execute(callback); // } -Status HttpClient::execute_post_request(const std::string& post_data, std::string* response) { +Status HttpClient::execute_post_request(const std::string& payload, std::string* response) { set_method(POST); - set_post_body(post_data); + set_payload(payload); return execute(response); } +Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) { + set_method(DELETE); + set_payload(payload); + return execute(response); +} + Status HttpClient::execute(const std::function& callback) { _callback = &callback; auto code = curl_easy_perform(_curl); diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index b7ce12feb8a7d1..02f4edb4c12443 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -54,49 +54,24 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_PASSWORD, passwd.c_str()); } -<<<<<<< HEAD -<<<<<<< HEAD // content_type such as "application/json" void set_content_type(const std::string content_type) { std::string scratch_str = "Content-Type: " + content_type; -<<<<<<< HEAD -<<<<<<< HEAD _header_list = curl_slist_append(_header_list, scratch_str.c_str()); curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); -======= - header_list = curl_slist_append(NULL, scratch_str.c_str()); - curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, header_list); ->>>>>>> Change HttpClient to support http post } // you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up - void set_post_body(const std::string& post_body) { -======= -======= // note: set_content_type would reset the http headers void set_content_type(const std::string content_type) { std::string scratch_str = "Content-Type: " + content_type; if (_header_list != nullptr) { curl_slist_free_all(_header_list); } ->>>>>>> Add http post feature for HttpClient - _header_list = curl_slist_append(NULL, scratch_str.c_str()); -======= - // content_type such as "application/json" - void set_content_type(const std::string content_type) { - std::string scratch_str = "Content-Type: " + content_type; - _header_list = curl_slist_append(_header_list, scratch_str.c_str()); ->>>>>>> Add http post feature for HttpClient curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); } - // you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up - void set_post_body(const std::string& post_body) { -<<<<<<< HEAD - curl_easy_setopt(_curl, CURLOPT_POSTFIELDS, post_body.c_str()); ->>>>>>> Change HttpClient to support http post -======= ->>>>>>> Add http post feature for HttpClient + void set_payload(const std::string& post_body) { curl_easy_setopt(_curl, CURLOPT_POSTFIELDSIZE, (long)post_body.length()); curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); } @@ -148,7 +123,9 @@ class HttpClient { // a file to local_path Status download(const std::string& local_path); - Status execute_post_request(const std::string& post_data, std::string* response); + Status execute_post_request(const std::string& payload, std::string* response); + + Status execute_delete_request(const std::string& payload, std::string* response); // execute a simple method, and its response is saved in response argument Status execute(std::string* response); @@ -166,15 +143,7 @@ class HttpClient { using HttpCallback = std::function; const HttpCallback* _callback = nullptr; char _error_buf[CURL_ERROR_SIZE]; -<<<<<<< HEAD -<<<<<<< HEAD - curl_slist *_header_list = nullptr; -======= - curl_slist *header_list; ->>>>>>> Change HttpClient to support http post -======= curl_slist *_header_list = nullptr; ->>>>>>> Change HttpClient to support http post }; } diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 3fa44dc6fa1059..2f33e34c92a9c2 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -74,6 +74,8 @@ add_library(Util STATIC aes_util.cpp string_util.cpp md5.cpp + es_scan_reader.cpp + es_search_builder.cpp ) #ADD_BE_TEST(integer-array-test) @@ -87,3 +89,4 @@ add_library(Util STATIC #ADD_BE_TEST(bit-util-test) #ADD_BE_TEST(rle-test) ##ADD_BE_TEST(perf-counters-test) +##ADD_BE_TEST(es-scan-reader-test) \ No newline at end of file diff --git a/be/src/util/es_scan_reader.cpp b/be/src/util/es_scan_reader.cpp new file mode 100644 index 00000000000000..be42cf2ccdc47c --- /dev/null +++ b/be/src/util/es_scan_reader.cpp @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include "es_scan_reader.h" +#include "rapidjson/writer.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "common/logging.h" +#include "common/status.h" +#include + +namespace doris { +const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields"; +const std::string REQUEST_SCROLL_PATH = "_scroll"; +const std::string REQUEST_PREFERENCE_PREFIX = "&preference=shards:"; +const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll"; +const std::string REQUEST_SEPARATOR = "/"; +const std::string REQUEST_SCROLL_TIME = "5m"; + +const char* FIELD_SCROLL_ID = "_scroll_id"; +const char* FIELD_HITS = "hits"; +const char* FIELD_INNER_HITS = "hits"; +const char* FIELD_SOURCE = "_source"; +const char* FIELD_TOTAL = "total"; + +ESScanReader::ESScanReader(const std::string& target, uint16_t size,std::map& props) { + LOG(INFO) << "ESScanReader "; + _target = target; + _batch_size = size; + _index = props.at(KEY_INDEX); + _type = props.at(KEY_TYPE); + if (props.find(KEY_USER_NAME) != props.end()) { + _user_name = props.at(KEY_USER_NAME); + } + if (props.find(KEY_PASS_WORD) != props.end()){ + _passwd = props.at(KEY_PASS_WORD); + } + if (props.find(KEY_SHARDS) != props.end()) { + _shards = props.at(KEY_SHARDS); + } + if (props.find(KEY_QUERY) != props.end()) { + _query = props.at(KEY_QUERY); + } + _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH; + _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH; + _eos = false; +} + +ESScanReader::~ESScanReader() { + if (_cached_response != nullptr) { + free(_cached_response); + _cached_response = nullptr; + } +} + +Status ESScanReader::open() { + _is_first = true; + RETURN_IF_ERROR(_network_client.init(_init_scroll_url)); + _network_client.set_basic_auth(_user_name, _passwd); + _network_client.set_content_type("application/json"); + // phase open, we cached the first response for `get_next` phase + _network_client.execute_post_request(_query, _cached_response); + long status = _network_client.get_http_status(); + if (status != 200) { + LOG(WARNING) << "invalid response http status for open: " << status; + return Status(*_cached_response); + } + VLOG(1) << "open _cached response: " << *_cached_response; + rapidjson::Document document_node; + document_node.Parse<0>(_cached_response->c_str()); + // empty index + if (!document_node.HasMember(FIELD_SCROLL_ID)) { + _eos = true; + return Status::OK; + } + rapidjson::Value &scroll_node = document_node[FIELD_SCROLL_ID]; + _scroll_id = scroll_node.GetString(); + // { hits: { total : 2, "hits" : [ {}, {}, {} ]}} + rapidjson::Value &outer_hits_node = document_node[FIELD_HITS]; + rapidjson::Value &total = document_node[FIELD_TOTAL]; + VLOG(1) << "es_scan_reader total hits: " << total.GetInt() << " documents"; + if (outer_hits_node.HasMember("FIELD_INNER_HITS")) { + return Status("es_scan_reader invalid response from elasticsearch"); + } + rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS]; + if (!inner_hits_node.IsArray()) { + return Status("es_scan_reader invalid response from elasticsearch"); + } + int size = inner_hits_node.Size(); + if (size < _batch_size) { + _eos = true; + } + return Status::OK; +} + +Status ESScanReader::get_next(bool* eos, std::string* response) { + // if is first scroll request, should return the cached response + if (_is_first) { + // maybe the index or shard is empty + if (_eos) { + *eos = true; + return Status::OK; + } + _is_first = false; + *eos = _eos; + *response = *_cached_response; + return Status::OK; + } + RETURN_IF_ERROR(_network_client.init(_next_scroll_url)); + _network_client.set_basic_auth(_user_name, _passwd); + _network_client.set_content_type("application/json"); + _network_client.set_timeout_ms(5 * 1000); + rapidjson::Document scroll_dsl; + rapidjson::Document::AllocatorType &allocator = scroll_dsl.GetAllocator(); + scroll_dsl.SetObject(); + rapidjson::Value scroll_id_value(_scroll_id.c_str(), allocator); + scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); + rapidjson::Value scroll_value(REQUEST_SCROLL_TIME.c_str(), allocator); + scroll_dsl.AddMember("scroll", scroll_value, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + scroll_dsl.Accept(writer); + std::string scroll_dsl_json = buffer.GetString(); + RETURN_IF_ERROR(_network_client.execute_post_request(scroll_dsl_json, response)); + long status = _network_client.get_http_status(); + if (status == 404) { + LOG(WARNING) << "request scroll search failure 404[" + << ", response: " << (response->empty() ? "empty response" : *response); + return Status("No search context found for " + _scroll_id); + } + if (status != 200) { + LOG(WARNING) << "request scroll search failure[" + << "http status" << status + << ", response: " << (response->empty() ? "empty response" : *response); + if (status == 404) { + return Status("No search context found for " + _scroll_id); + } + return Status("request scroll search failure: " + (response->empty() ? "empty response" : *response)); + } + rapidjson::Document document_node; + document_node.Parse<0>(response->c_str()); + if (!document_node.HasMember(FIELD_SCROLL_ID)) { + return Status("Invalid _search/scroll request, please check !"); + } + rapidjson::Value &scroll_node = document_node[FIELD_SCROLL_ID]; + _scroll_id = scroll_node.GetString(); + // { hits: { total : 2, "hits" : [ {}, {}, {} ]}} + rapidjson::Value &outer_hits_node = document_node[FIELD_HITS]; + // rapidjson::Value &total = document_node[FIELD_TOTAL]; + if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) { + *eos = _eos = true; + return Status::OK; + } + rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS]; + if (!inner_hits_node.IsArray()) { + return Status("invalid response from elasticsearch"); + } + size_t size = inner_hits_node.Size(); + if (size < _batch_size) { + *eos = _eos = true; + } else { + *eos = _eos = false; + } + return Status::OK; +} + +Status ESScanReader::close() { + std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH; + RETURN_IF_ERROR(_network_client.init(scratch_target)); + _network_client.set_basic_auth(_user_name, _passwd); + _network_client.set_method(DELETE); + _network_client.set_content_type("application/json"); + _network_client.set_timeout_ms(5 * 1000); + rapidjson::Document delete_scroll_dsl; + rapidjson::Document::AllocatorType &allocator = delete_scroll_dsl.GetAllocator(); + delete_scroll_dsl.SetObject(); + rapidjson::Value scroll_id_value(_scroll_id.c_str(), allocator); + delete_scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + delete_scroll_dsl.Accept(writer); + //send DELETE scorll post request + std::string delete_scroll_dsl_json = buffer.GetString(); + std::string response; + RETURN_IF_ERROR(_network_client.execute_delete_request(delete_scroll_dsl_json, &response)); + if (_network_client.get_http_status() == 200) { + return Status::OK; + } else { + return Status("es_scan_reader delete scroll context failure"); + } +} +} diff --git a/be/src/util/es_scan_reader.h b/be/src/util/es_scan_reader.h new file mode 100644 index 00000000000000..c2c16411196d48 --- /dev/null +++ b/be/src/util/es_scan_reader.h @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "http/http_client.h" + +using std::string; + +namespace doris { + +class Status; + +class ESScanReader { +private: + std::string _target; + std::string _user_name; + std::string _passwd; + std::string _scroll_id; + HttpClient _network_client; + std::string _index; + std::string _type; + // push down filter + std::string _query; + // elaticsearch shards to fetch document + std::string _shards; + // distinguish the first scroll phase and the following scroll + bool _is_first; + std::string _init_scroll_url; + std::string _next_scroll_url; + bool _eos; + uint16_t _batch_size; + + std::string *_cached_response = new std::string(); + + +public: + static constexpr const char* KEY_USER_NAME = "user"; + static constexpr const char* KEY_PASS_WORD = "passwd"; + static constexpr const char* KEY_INDEX = "index"; + static constexpr const char* KEY_TYPE = "type"; + static constexpr const char* KEY_SHARDS = "shards"; + static constexpr const char* KEY_QUERY = "query"; + static constexpr const char* KEY_BATCH_SIZE = "batch_size"; + ESScanReader(const std::string& target, uint16_t size, std::map& props); + ~ESScanReader(); + + // launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next + Status open(); + // invoke get_next to get next batch documents from elasticsearch + Status get_next(bool *eos, std::string* response); + // clear scroll context from elasticsearch + Status close(); + +}; +} + diff --git a/be/src/util/es_search_builder.cpp b/be/src/util/es_search_builder.cpp new file mode 100644 index 00000000000000..8d69d2d62de31e --- /dev/null +++ b/be/src/util/es_search_builder.cpp @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "es_search_builder.h" +#include +#include +#include "common/logging.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" + +namespace doris { + +SearchRequestBuilder::SearchRequestBuilder() { + +} + +SearchRequestBuilder::~SearchRequestBuilder() { + +} + +std::string SearchRequestBuilder::build() { + rapidjson::Document es_query_dsl; + rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator(); + es_query_dsl.SetObject(); + if (_fields.size() > 0) { + rapidjson::Value source_node(rapidjson::kArrayType); + for (auto iter = _fields.cbegin(); iter != _fields.cend(); iter++) { + rapidjson::Value field(iter->c_str(), allocator); + source_node.PushBack(field, allocator); + } + es_query_dsl.AddMember("_source", source_node, allocator); + } + + rapidjson::Value sort_node(rapidjson::kArrayType); + rapidjson::Value field("_doc", allocator); + sort_node.PushBack(field, allocator); + es_query_dsl.AddMember("sort", sort_node, allocator); + + es_query_dsl.AddMember("size", _size, allocator); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + es_query_dsl.Accept(writer); + std::string es_query_dsl_json = buffer.GetString(); + return es_query_dsl_json; +} + +} diff --git a/be/src/util/es_search_builder.h b/be/src/util/es_search_builder.h new file mode 100644 index 00000000000000..85bc231c8881a1 --- /dev/null +++ b/be/src/util/es_search_builder.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#pragma once +#include +#include + +namespace doris { + +class SearchRequestBuilder { + +private: + std::vector _fields; + uint16_t _size; + +public: + SearchRequestBuilder(); + ~SearchRequestBuilder(); + // build the query DSL for elasticsearch + std::string build(); + + + void set_batch_size(uint16_t batch_size) { + _size = batch_size; + } + void set_selected_fields(std::vector& fields) { + _fields = fields; + } +}; +} diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 0ac6e774d4661f..ac87fbab2f3dfb 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -38,3 +38,4 @@ ADD_BE_TEST(uid_util_test) ADD_BE_TEST(arena_test) ADD_BE_TEST(aes_util_test) ADD_BE_TEST(md5_test) +ADD_BE_TEST(es_scan_reader_test) diff --git a/be/test/util/es_scan_reader_test.cpp b/be/test/util/es_scan_reader_test.cpp new file mode 100644 index 00000000000000..d90f10059880cc --- /dev/null +++ b/be/test/util/es_scan_reader_test.cpp @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "util/es_scan_reader.h" +#include "util/es_search_builder.h" +#include +#include "common/logging.h" +#include "http/ev_http_server.h" +#include "http/http_channel.h" +#include "http/http_handler.h" +#include "http/http_request.h" +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" +#include +#include +#include + +namespace doris { + +class RestSearchAction : public HttpHandler { +public: + void handle(HttpRequest* req) override { + std::string user; + std::string passwd; + if (!parse_basic_auth(*req, &user, &passwd) || user != "root") { + HttpChannel::send_basic_challenge(req, "abc"); + return; + } + req->add_output_header(HttpHeaders::CONTENT_TYPE, "application/json"); + if (req->method() == HttpMethod::POST) { + std::string post_body = req->get_request_body(); + rapidjson::Document post_doc; + post_doc.Parse<0>(post_body.c_str()); + int size = 1; + if (post_doc.HasMember("size")) { + rapidjson::Value& size_value = post_doc["size"]; + size = size_value.GetInt(); + } + std::string _scroll_id(std::to_string(size)); + rapidjson::Document search_result; + rapidjson::Document::AllocatorType &allocator = search_result.GetAllocator(); + search_result.SetObject(); + rapidjson::Value scroll_id_value(_scroll_id.c_str(), allocator); + search_result.AddMember("_scroll_id", scroll_id_value, allocator); + + rapidjson::Value outer_hits(rapidjson::kObjectType); + outer_hits.AddMember("total", 10, allocator); + rapidjson::Value inner_hits(rapidjson::kArrayType); + rapidjson::Value source_docuement(rapidjson::kObjectType); + source_docuement.AddMember("id", 1, allocator); + rapidjson::Value value_node("1", allocator); + source_docuement.AddMember("value", value_node, allocator); + inner_hits.PushBack(source_docuement, allocator); + outer_hits.AddMember("hits", inner_hits, allocator); + search_result.AddMember("hits", outer_hits, allocator); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + search_result.Accept(writer); + //send DELETE scorll post request + std::string search_result_json = buffer.GetString(); + HttpChannel::send_reply(req, search_result_json); + } else { + std::string response = "test1"; + HttpChannel::send_reply(req, response); + } + } +}; + +class RestSearchScrollAction : public HttpHandler { +public: + void handle(HttpRequest* req) override { + std::string user; + std::string passwd; + if (!parse_basic_auth(*req, &user, &passwd) || user != "root") { + HttpChannel::send_basic_challenge(req, "abc"); + return; + } + if (req->method() == HttpMethod::POST) { + std::string post_body = req->get_request_body(); + rapidjson::Document post_doc; + post_doc.Parse<0>(post_body.c_str()); + int size = 1; + std::string scroll_id; + if (!post_doc.HasMember("scroll_id")) { + HttpChannel::send_reply(req,HttpStatus::NOT_FOUND, "invalid scroll request"); + return; + } else { + rapidjson::Value& scroll_id_value = post_doc["scroll_id"]; + scroll_id = scroll_id_value.GetString(); + int offset = atoi(scroll_id.c_str()); + if (offset > 10) { + rapidjson::Document end_search_result; + rapidjson::Document::AllocatorType &allocator = end_search_result.GetAllocator(); + end_search_result.SetObject(); + rapidjson::Value scroll_id_value("11", allocator); + end_search_result.AddMember("_scroll_id", scroll_id_value, allocator); + + rapidjson::Value outer_hits(rapidjson::kObjectType); + outer_hits.AddMember("total", 10, allocator); + end_search_result.AddMember("hits", outer_hits, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + end_search_result.Accept(writer); + //send DELETE scorll post request + std::string end_search_result_json = buffer.GetString(); + HttpChannel::send_reply(req, end_search_result_json); + return; + } else { + int start = offset + 1; + rapidjson::Document search_result; + rapidjson::Document::AllocatorType &allocator = search_result.GetAllocator(); + search_result.SetObject(); + rapidjson::Value scroll_id_value(std::to_string(start).c_str(), allocator); + search_result.AddMember("_scroll_id", scroll_id_value, allocator); + + rapidjson::Value outer_hits(rapidjson::kObjectType); + outer_hits.AddMember("total", 10, allocator); + rapidjson::Value inner_hits(rapidjson::kArrayType); + rapidjson::Value source_docuement(rapidjson::kObjectType); + source_docuement.AddMember("id", start, allocator); + rapidjson::Value value_node(std::to_string(start).c_str(), allocator); + source_docuement.AddMember("value", value_node, allocator); + inner_hits.PushBack(source_docuement, allocator); + outer_hits.AddMember("hits", inner_hits, allocator); + search_result.AddMember("hits", outer_hits, allocator); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + search_result.Accept(writer); + //send DELETE scorll post request + std::string search_result_json = buffer.GetString(); + HttpChannel::send_reply(req, search_result_json); + return; + } + + } + } + } +}; + +class RestClearScrollAction : public HttpHandler { +public: + void handle(HttpRequest* req) override { + std::string user; + std::string passwd; + if (!parse_basic_auth(*req, &user, &passwd) || user != "root") { + HttpChannel::send_basic_challenge(req, "abc"); + return; + } + if (req->method() == HttpMethod::DELETE) { + std::string post_body = req->get_request_body(); + rapidjson::Document post_doc; + post_doc.Parse<0>(post_body.c_str()); + int size = 1; + std::string scroll_id; + if (!post_doc.HasMember("scroll_id")) { + HttpChannel::send_reply(req,HttpStatus::NOT_FOUND, "invalid scroll request"); + return; + } else { + rapidjson::Document clear_scroll_result; + rapidjson::Document::AllocatorType &allocator = clear_scroll_result.GetAllocator(); + clear_scroll_result.SetObject(); + clear_scroll_result.AddMember("succeeded", true, allocator); + clear_scroll_result.AddMember("num_freed", 1, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + clear_scroll_result.Accept(writer); + std::string clear_scroll_result_json = buffer.GetString(); + HttpChannel::send_reply(req, clear_scroll_result_json); + return; + } + } + } +}; + +static RestSearchAction rest_search_action = RestSearchAction(); +static RestSearchScrollAction rest_search_scroll_action = RestSearchScrollAction(); +static RestClearScrollAction rest_clear_scroll_action = RestClearScrollAction(); +static EvHttpServer* mock_es_server = nullptr; + +class HttpClientTest : public testing::Test { +public: + HttpClientTest() { } + ~HttpClientTest() override { } + + static void SetUpTestCase() { + mock_es_server = new EvHttpServer(29386); + mock_es_server->register_handler(POST, "/{index}/{type}/_search", &rest_search_action); + mock_es_server->register_handler(POST, "/_search/scroll", &rest_search_scroll_action); + mock_es_server->register_handler(DELETE, "/_search/scroll", &rest_clear_scroll_action); + mock_es_server->start(); + } + + static void TearDownTestCase() { + delete mock_es_server; + } +}; + +TEST_F(HttpClientTest, open) { + std::string target = "http://127.0.0.1:29386"; + SearchRequestBuilder search_builder; + search_builder.set_batch_size(1); + std::vector fields = {"id", "value"}; + search_builder.set_selected_fields(fields); + std::map props; + props[ESScanReader::KEY_INDEX] = "tindex"; + props[ESScanReader::KEY_TYPE] = "doc"; + props[ESScanReader::KEY_USER_NAME] = "root"; + props[ESScanReader::KEY_PASS_WORD] = "root"; + props[ESScanReader::KEY_SHARDS] = "0"; + props[ESScanReader::KEY_QUERY] = search_builder.build(); + ESScanReader reader(target, 1, props); + auto st = reader.open(); + // ASSERT_TRUE(st.ok()); + bool eos = false; + while(!eos){ + std::string response; + st = reader.get_next(&eos, &response); + if(eos) { + break; + } + rapidjson::Document docuemnt_node; + docuemnt_node.Parse<0>(response.c_str()); + rapidjson::Value &scroll_node = docuemnt_node["_scroll_id"]; + std::string _scroll_id = scroll_node.GetString(); + int id = atoi(_scroll_id.c_str()); + rapidjson::Value &outer_hits_node = docuemnt_node["hits"]; + rapidjson::Value &inner_hits_node = outer_hits_node["hits"]; + rapidjson::Value &source_node = inner_hits_node[0]; + rapidjson::Value &id_node = source_node["id"]; + rapidjson::Value &value_node = source_node["value"]; + ASSERT_EQ(id, id_node.GetInt()); + std::string value = value_node.GetString(); + ASSERT_EQ(id, atoi(value.c_str())); + ASSERT_TRUE(st.ok()); + } + auto cst = reader.close(); + ASSERT_TRUE(cst.ok()); +} +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 8e169c76ee0e79338b53ba5eab67598e5ecd9008 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 11:57:08 +0800 Subject: [PATCH 09/17] Modify rebase error --- be/src/http/http_client.cpp | 1 - be/test/http/http_client_test.cpp | 17 ----------------- 2 files changed, 18 deletions(-) diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index 207f7be2bc5e99..ca061cfea88a0c 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -30,7 +30,6 @@ HttpClient::~HttpClient() { if(_header_list != nullptr) { curl_slist_free_all(_header_list); _header_list = nullptr; - } } diff --git a/be/test/http/http_client_test.cpp b/be/test/http/http_client_test.cpp index 67b3035d9e2f2f..343c60614466a8 100644 --- a/be/test/http/http_client_test.cpp +++ b/be/test/http/http_client_test.cpp @@ -135,23 +135,6 @@ TEST_F(HttpClientTest, get_failed) { std::string response; st = client.execute(&response); ASSERT_FALSE(!st.ok()); -<<<<<<< HEAD -} - -TEST_F(HttpClientTest, post_normal) { - HttpClient client; - auto st = client.init("http://127.0.0.1:29386/simple_post"); - ASSERT_TRUE(st.ok()); - client.set_method(POST); - client.set_basic_auth("test1", ""); - std::string response; - std::string request_body = "simple post body query"; - st = client.execute_post_request(request_body, &response); - ASSERT_TRUE(st.ok()); - ASSERT_EQ(response.length(), request_body.length()); - ASSERT_STREQ(response.c_str(), request_body.c_str()); -======= ->>>>>>> Add http post feature for HttpClient } TEST_F(HttpClientTest, post_normal) { From 04720a859c3506ff66986a9e6778c06bb416d9d6 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 12:08:39 +0800 Subject: [PATCH 10/17] Resolve merge conflict --- be/src/http/http_client.h | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index 02f4edb4c12443..d54bc680d33aaf 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -61,16 +61,6 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); } - // you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up - // note: set_content_type would reset the http headers - void set_content_type(const std::string content_type) { - std::string scratch_str = "Content-Type: " + content_type; - if (_header_list != nullptr) { - curl_slist_free_all(_header_list); - } - curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); - } - void set_payload(const std::string& post_body) { curl_easy_setopt(_curl, CURLOPT_POSTFIELDSIZE, (long)post_body.length()); curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); From b13b7bdcc242ca09ba74b85c53532b569bf41daa Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 12:26:03 +0800 Subject: [PATCH 11/17] Add Elasticsearch scan reader --- be/src/util/es_scan_reader.cpp | 2 +- be/src/util/es_scan_reader.h | 2 +- be/src/util/es_search_builder.cpp | 2 +- be/src/util/es_search_builder.h | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/be/src/util/es_scan_reader.cpp b/be/src/util/es_scan_reader.cpp index be42cf2ccdc47c..81c77c1a1935f7 100644 --- a/be/src/util/es_scan_reader.cpp +++ b/be/src/util/es_scan_reader.cpp @@ -39,7 +39,7 @@ const char* FIELD_INNER_HITS = "hits"; const char* FIELD_SOURCE = "_source"; const char* FIELD_TOTAL = "total"; -ESScanReader::ESScanReader(const std::string& target, uint16_t size,std::map& props) { +ESScanReader::ESScanReader(const std::string& target, const uint16_t size,std::map& props) { LOG(INFO) << "ESScanReader "; _target = target; _batch_size = size; diff --git a/be/src/util/es_scan_reader.h b/be/src/util/es_scan_reader.h index c2c16411196d48..2a4772be3cb214 100644 --- a/be/src/util/es_scan_reader.h +++ b/be/src/util/es_scan_reader.h @@ -57,7 +57,7 @@ class ESScanReader { static constexpr const char* KEY_SHARDS = "shards"; static constexpr const char* KEY_QUERY = "query"; static constexpr const char* KEY_BATCH_SIZE = "batch_size"; - ESScanReader(const std::string& target, uint16_t size, std::map& props); + ESScanReader(const std::string& target, uint16_t size, const std::map& props); ~ESScanReader(); // launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next diff --git a/be/src/util/es_search_builder.cpp b/be/src/util/es_search_builder.cpp index 8d69d2d62de31e..ff3e86702195f6 100644 --- a/be/src/util/es_search_builder.cpp +++ b/be/src/util/es_search_builder.cpp @@ -39,7 +39,7 @@ std::string SearchRequestBuilder::build() { es_query_dsl.SetObject(); if (_fields.size() > 0) { rapidjson::Value source_node(rapidjson::kArrayType); - for (auto iter = _fields.cbegin(); iter != _fields.cend(); iter++) { + for (auto iter = _fields.begin(); iter != _fields.end(); iter++) { rapidjson::Value field(iter->c_str(), allocator); source_node.PushBack(field, allocator); } diff --git a/be/src/util/es_search_builder.h b/be/src/util/es_search_builder.h index 85bc231c8881a1..0c267ae7f839dd 100644 --- a/be/src/util/es_search_builder.h +++ b/be/src/util/es_search_builder.h @@ -24,10 +24,6 @@ namespace doris { class SearchRequestBuilder { -private: - std::vector _fields; - uint16_t _size; - public: SearchRequestBuilder(); ~SearchRequestBuilder(); @@ -41,5 +37,9 @@ class SearchRequestBuilder { void set_selected_fields(std::vector& fields) { _fields = fields; } + +private: + std::vector _fields; + uint16_t _size; }; } From 9e1bfcadb50fe6fb60785f704e66bde9501f0cc9 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 13:15:34 +0800 Subject: [PATCH 12/17] Change some way of naming --- be/src/util/CMakeLists.txt | 2 +- ..._search_builder.cpp => es_scroll_query.cpp} | 8 ++++---- .../{es_search_builder.h => es_scroll_query.h} | 6 +++--- be/test/util/es_scan_reader_test.cpp | 18 +++++++++--------- 4 files changed, 17 insertions(+), 17 deletions(-) rename be/src/util/{es_search_builder.cpp => es_scroll_query.cpp} (92%) rename be/src/util/{es_search_builder.h => es_scroll_query.h} (93%) diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 2f33e34c92a9c2..f6ea25abebfb0a 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -75,7 +75,7 @@ add_library(Util STATIC string_util.cpp md5.cpp es_scan_reader.cpp - es_search_builder.cpp + es_scroll_query.cpp ) #ADD_BE_TEST(integer-array-test) diff --git a/be/src/util/es_search_builder.cpp b/be/src/util/es_scroll_query.cpp similarity index 92% rename from be/src/util/es_search_builder.cpp rename to be/src/util/es_scroll_query.cpp index ff3e86702195f6..a9ab3da321ea67 100644 --- a/be/src/util/es_search_builder.cpp +++ b/be/src/util/es_scroll_query.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "es_search_builder.h" +#include "es_scroll_query.h" #include #include #include "common/logging.h" @@ -25,15 +25,15 @@ namespace doris { -SearchRequestBuilder::SearchRequestBuilder() { +ESScrollQueryBuilder::ESScrollQueryBuilder() { } -SearchRequestBuilder::~SearchRequestBuilder() { +ESScrollQueryBuilder::~ESScrollQueryBuilder() { } -std::string SearchRequestBuilder::build() { +std::string ESScrollQueryBuilder::build() { rapidjson::Document es_query_dsl; rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator(); es_query_dsl.SetObject(); diff --git a/be/src/util/es_search_builder.h b/be/src/util/es_scroll_query.h similarity index 93% rename from be/src/util/es_search_builder.h rename to be/src/util/es_scroll_query.h index 0c267ae7f839dd..5ab23dd41f12c6 100644 --- a/be/src/util/es_search_builder.h +++ b/be/src/util/es_scroll_query.h @@ -22,11 +22,11 @@ namespace doris { -class SearchRequestBuilder { +class ESScrollQueryBuilder { public: - SearchRequestBuilder(); - ~SearchRequestBuilder(); + ESScrollQueryBuilder(); + ~ESScrollQueryBuilder(); // build the query DSL for elasticsearch std::string build(); diff --git a/be/test/util/es_scan_reader_test.cpp b/be/test/util/es_scan_reader_test.cpp index d90f10059880cc..97bf654384e63c 100644 --- a/be/test/util/es_scan_reader_test.cpp +++ b/be/test/util/es_scan_reader_test.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. #include "util/es_scan_reader.h" -#include "util/es_search_builder.h" +#include "util/es_scroll_query.h" #include #include "common/logging.h" #include "http/ev_http_server.h" @@ -193,10 +193,10 @@ static RestSearchScrollAction rest_search_scroll_action = RestSearchScrollAction static RestClearScrollAction rest_clear_scroll_action = RestClearScrollAction(); static EvHttpServer* mock_es_server = nullptr; -class HttpClientTest : public testing::Test { +class MockESServerTest : public testing::Test { public: - HttpClientTest() { } - ~HttpClientTest() override { } + MockESServerTest() { } + ~MockESServerTest() override { } static void SetUpTestCase() { mock_es_server = new EvHttpServer(29386); @@ -211,19 +211,19 @@ class HttpClientTest : public testing::Test { } }; -TEST_F(HttpClientTest, open) { +TEST_F(MockESServerTest, workflow) { std::string target = "http://127.0.0.1:29386"; - SearchRequestBuilder search_builder; - search_builder.set_batch_size(1); + ESScrollQueryBuilder scroll_query_builder; + scroll_query_builder.set_batch_size(1); std::vector fields = {"id", "value"}; - search_builder.set_selected_fields(fields); + scroll_query_builder.set_selected_fields(fields); std::map props; props[ESScanReader::KEY_INDEX] = "tindex"; props[ESScanReader::KEY_TYPE] = "doc"; props[ESScanReader::KEY_USER_NAME] = "root"; props[ESScanReader::KEY_PASS_WORD] = "root"; props[ESScanReader::KEY_SHARDS] = "0"; - props[ESScanReader::KEY_QUERY] = search_builder.build(); + props[ESScanReader::KEY_QUERY] = scroll_query_builder.build(); ESScanReader reader(target, 1, props); auto st = reader.open(); // ASSERT_TRUE(st.ok()); From d228a503ec6b594aa016066ca63f97ceb649090a Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 14:31:52 +0800 Subject: [PATCH 13/17] Refactor Elasticsearch releated logic --- be/src/util/CMakeLists.txt | 1 + be/src/util/es_scan_reader.cpp | 87 ++++---------------------------- be/src/util/es_scan_reader.h | 41 +++++++-------- be/src/util/es_scroll_parser.cpp | 78 ++++++++++++++++++++++++++++ be/src/util/es_scroll_parser.h | 44 ++++++++++++++++ be/src/util/es_scroll_query.cpp | 26 ++++++++++ be/src/util/es_scroll_query.h | 3 ++ 7 files changed, 182 insertions(+), 98 deletions(-) create mode 100644 be/src/util/es_scroll_parser.cpp create mode 100644 be/src/util/es_scroll_parser.h diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index f6ea25abebfb0a..fb38438761d165 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -76,6 +76,7 @@ add_library(Util STATIC md5.cpp es_scan_reader.cpp es_scroll_query.cpp + es_scroll_parser.cpp ) #ADD_BE_TEST(integer-array-test) diff --git a/be/src/util/es_scan_reader.cpp b/be/src/util/es_scan_reader.cpp index 81c77c1a1935f7..423e46934dde2d 100644 --- a/be/src/util/es_scan_reader.cpp +++ b/be/src/util/es_scan_reader.cpp @@ -18,6 +18,7 @@ #include #include #include "es_scan_reader.h" +#include "es_scroll_query.h" #include "rapidjson/writer.h" #include "rapidjson/document.h" #include "rapidjson/stringbuffer.h" @@ -33,13 +34,7 @@ const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll"; const std::string REQUEST_SEPARATOR = "/"; const std::string REQUEST_SCROLL_TIME = "5m"; -const char* FIELD_SCROLL_ID = "_scroll_id"; -const char* FIELD_HITS = "hits"; -const char* FIELD_INNER_HITS = "hits"; -const char* FIELD_SOURCE = "_source"; -const char* FIELD_TOTAL = "total"; - -ESScanReader::ESScanReader(const std::string& target, const uint16_t size,std::map& props) { +ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std::map& props) { LOG(INFO) << "ESScanReader "; _target = target; _batch_size = size; @@ -60,6 +55,7 @@ ESScanReader::ESScanReader(const std::string& target, const uint16_t size,std::m _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH; _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH; _eos = false; + _parser.set_batch_size(size); } ESScanReader::~ESScanReader() { @@ -82,30 +78,8 @@ Status ESScanReader::open() { return Status(*_cached_response); } VLOG(1) << "open _cached response: " << *_cached_response; - rapidjson::Document document_node; - document_node.Parse<0>(_cached_response->c_str()); - // empty index - if (!document_node.HasMember(FIELD_SCROLL_ID)) { - _eos = true; - return Status::OK; - } - rapidjson::Value &scroll_node = document_node[FIELD_SCROLL_ID]; - _scroll_id = scroll_node.GetString(); - // { hits: { total : 2, "hits" : [ {}, {}, {} ]}} - rapidjson::Value &outer_hits_node = document_node[FIELD_HITS]; - rapidjson::Value &total = document_node[FIELD_TOTAL]; - VLOG(1) << "es_scan_reader total hits: " << total.GetInt() << " documents"; - if (outer_hits_node.HasMember("FIELD_INNER_HITS")) { - return Status("es_scan_reader invalid response from elasticsearch"); - } - rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS]; - if (!inner_hits_node.IsArray()) { - return Status("es_scan_reader invalid response from elasticsearch"); - } - int size = inner_hits_node.Size(); - if (size < _batch_size) { - _eos = true; - } + RETURN_IF_ERROR(_parser.parse(*_cached_response)); + _eos = _parser.has_next(); return Status::OK; } @@ -126,18 +100,7 @@ Status ESScanReader::get_next(bool* eos, std::string* response) { _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(5 * 1000); - rapidjson::Document scroll_dsl; - rapidjson::Document::AllocatorType &allocator = scroll_dsl.GetAllocator(); - scroll_dsl.SetObject(); - rapidjson::Value scroll_id_value(_scroll_id.c_str(), allocator); - scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); - rapidjson::Value scroll_value(REQUEST_SCROLL_TIME.c_str(), allocator); - scroll_dsl.AddMember("scroll", scroll_value, allocator); - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - scroll_dsl.Accept(writer); - std::string scroll_dsl_json = buffer.GetString(); - RETURN_IF_ERROR(_network_client.execute_post_request(scroll_dsl_json, response)); + RETURN_IF_ERROR(_network_client.execute_post_request(ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, REQUEST_SCROLL_TIME), response)); long status = _network_client.get_http_status(); if (status == 404) { LOG(WARNING) << "request scroll search failure 404[" @@ -153,30 +116,8 @@ Status ESScanReader::get_next(bool* eos, std::string* response) { } return Status("request scroll search failure: " + (response->empty() ? "empty response" : *response)); } - rapidjson::Document document_node; - document_node.Parse<0>(response->c_str()); - if (!document_node.HasMember(FIELD_SCROLL_ID)) { - return Status("Invalid _search/scroll request, please check !"); - } - rapidjson::Value &scroll_node = document_node[FIELD_SCROLL_ID]; - _scroll_id = scroll_node.GetString(); - // { hits: { total : 2, "hits" : [ {}, {}, {} ]}} - rapidjson::Value &outer_hits_node = document_node[FIELD_HITS]; - // rapidjson::Value &total = document_node[FIELD_TOTAL]; - if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) { - *eos = _eos = true; - return Status::OK; - } - rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS]; - if (!inner_hits_node.IsArray()) { - return Status("invalid response from elasticsearch"); - } - size_t size = inner_hits_node.Size(); - if (size < _batch_size) { - *eos = _eos = true; - } else { - *eos = _eos = false; - } + RETURN_IF_ERROR(_parser.parse(*response)); + *eos = _eos = _parser.has_next(); return Status::OK; } @@ -187,18 +128,8 @@ Status ESScanReader::close() { _network_client.set_method(DELETE); _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(5 * 1000); - rapidjson::Document delete_scroll_dsl; - rapidjson::Document::AllocatorType &allocator = delete_scroll_dsl.GetAllocator(); - delete_scroll_dsl.SetObject(); - rapidjson::Value scroll_id_value(_scroll_id.c_str(), allocator); - delete_scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - delete_scroll_dsl.Accept(writer); - //send DELETE scorll post request - std::string delete_scroll_dsl_json = buffer.GetString(); std::string response; - RETURN_IF_ERROR(_network_client.execute_delete_request(delete_scroll_dsl_json, &response)); + RETURN_IF_ERROR(_network_client.execute_delete_request(ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response)); if (_network_client.get_http_status() == 200) { return Status::OK; } else { diff --git a/be/src/util/es_scan_reader.h b/be/src/util/es_scan_reader.h index 2a4772be3cb214..48e5fe6589ddac 100644 --- a/be/src/util/es_scan_reader.h +++ b/be/src/util/es_scan_reader.h @@ -19,6 +19,7 @@ #include #include "http/http_client.h" +#include "es_scroll_parser.h" using std::string; @@ -27,6 +28,25 @@ namespace doris { class Status; class ESScanReader { + +public: + static constexpr const char* KEY_USER_NAME = "user"; + static constexpr const char* KEY_PASS_WORD = "passwd"; + static constexpr const char* KEY_INDEX = "index"; + static constexpr const char* KEY_TYPE = "type"; + static constexpr const char* KEY_SHARDS = "shards"; + static constexpr const char* KEY_QUERY = "query"; + static constexpr const char* KEY_BATCH_SIZE = "batch_size"; + ESScanReader(const std::string& target, uint16_t size, const std::map& props); + ~ESScanReader(); + + // launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next + Status open(); + // invoke get_next to get next batch documents from elasticsearch + Status get_next(bool *eos, std::string* response); + // clear scroll context from elasticsearch + Status close(); + private: std::string _target; std::string _user_name; @@ -47,26 +67,7 @@ class ESScanReader { uint16_t _batch_size; std::string *_cached_response = new std::string(); - - -public: - static constexpr const char* KEY_USER_NAME = "user"; - static constexpr const char* KEY_PASS_WORD = "passwd"; - static constexpr const char* KEY_INDEX = "index"; - static constexpr const char* KEY_TYPE = "type"; - static constexpr const char* KEY_SHARDS = "shards"; - static constexpr const char* KEY_QUERY = "query"; - static constexpr const char* KEY_BATCH_SIZE = "batch_size"; - ESScanReader(const std::string& target, uint16_t size, const std::map& props); - ~ESScanReader(); - - // launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next - Status open(); - // invoke get_next to get next batch documents from elasticsearch - Status get_next(bool *eos, std::string* response); - // clear scroll context from elasticsearch - Status close(); - + ScrollParser _parser; }; } diff --git a/be/src/util/es_scroll_parser.cpp b/be/src/util/es_scroll_parser.cpp new file mode 100644 index 00000000000000..1a12c4f48ed6e6 --- /dev/null +++ b/be/src/util/es_scroll_parser.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "es_scroll_parser.h" +#include "rapidjson/document.h" +#include "common/logging.h" +#include "common/status.h" + +namespace doris { + +const char* FIELD_SCROLL_ID = "_scroll_id"; +const char* FIELD_HITS = "hits"; +const char* FIELD_INNER_HITS = "hits"; +const char* FIELD_SOURCE = "_source"; +const char* FIELD_TOTAL = "total"; + +ScrollParser::ScrollParser() { + _eos = false; + _total = 0; +} + +ScrollParser::~ScrollParser() { +} + + +Status ScrollParser::parse(std::string scroll_result) { + rapidjson::Document document_node; + document_node.Parse<0>(scroll_result.c_str()); + if (!document_node.HasMember(FIELD_SCROLL_ID)) { + return Status("maybe not a scroll request"); + } + rapidjson::Value &scroll_node = document_node[FIELD_SCROLL_ID]; + _scroll_id = scroll_node.GetString(); + // { hits: { total : 2, "hits" : [ {}, {}, {} ]}} + rapidjson::Value &outer_hits_node = document_node[FIELD_HITS]; + rapidjson::Value &total = document_node[FIELD_TOTAL]; + _total = total.GetInt(); + if (_total == 0) { + _eos = true; + return Status::OK; + } + VLOG(1) << "es_scan_reader total hits: " << _total << " documents"; + rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS]; + if (!inner_hits_node.IsArray()) { + return Status("invalid response from elasticsearch"); + } + _size = inner_hits_node.Size(); + if (_size < _batch_size) { + _eos = true; + } + return Status::OK; +} + +bool ScrollParser::has_next() { + return _eos; +} + +bool ScrollParser::count() { + return _size; +} + +std::string ScrollParser::get_scroll_id() { + return _scroll_id; +} +} diff --git a/be/src/util/es_scroll_parser.h b/be/src/util/es_scroll_parser.h new file mode 100644 index 00000000000000..4a7a850a40bdb6 --- /dev/null +++ b/be/src/util/es_scroll_parser.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include + +namespace doris { + +class Status; +class ScrollParser { + +public: + ScrollParser(); + ~ScrollParser(); + std::string get_scroll_id(); + bool count(); + uint32_t total(); + Status parse(std::string scroll_result); + bool has_next(); + void set_batch_size(int batch_size) { + _batch_size = batch_size; + } + +private: + std::string _scroll_id; + bool _eos; + int _total; + int _size; + int _batch_size; +}; +} diff --git a/be/src/util/es_scroll_query.cpp b/be/src/util/es_scroll_query.cpp index a9ab3da321ea67..e29a1be7d6ab68 100644 --- a/be/src/util/es_scroll_query.cpp +++ b/be/src/util/es_scroll_query.cpp @@ -33,6 +33,32 @@ ESScrollQueryBuilder::~ESScrollQueryBuilder() { } +std::string ESScrollQueryBuilder::build_next_scroll_body(std::string scroll_id, std::string scroll) { + rapidjson::Document scroll_dsl; + rapidjson::Document::AllocatorType &allocator = scroll_dsl.GetAllocator(); + scroll_dsl.SetObject(); + rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator); + scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); + rapidjson::Value scroll_value(scroll.c_str(), allocator); + scroll_dsl.AddMember("scroll", scroll_value, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + scroll_dsl.Accept(writer); + return buffer.GetString(); +} +std::string ESScrollQueryBuilder::build_clear_scroll_body(std::string scroll_id) { + rapidjson::Document delete_scroll_dsl; + rapidjson::Document::AllocatorType &allocator = delete_scroll_dsl.GetAllocator(); + delete_scroll_dsl.SetObject(); + rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator); + delete_scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + delete_scroll_dsl.Accept(writer); + return buffer.GetString(); +} + + std::string ESScrollQueryBuilder::build() { rapidjson::Document es_query_dsl; rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator(); diff --git a/be/src/util/es_scroll_query.h b/be/src/util/es_scroll_query.h index 5ab23dd41f12c6..b714d23c18d72e 100644 --- a/be/src/util/es_scroll_query.h +++ b/be/src/util/es_scroll_query.h @@ -38,6 +38,9 @@ class ESScrollQueryBuilder { _fields = fields; } + static std::string build_next_scroll_body(std::string scroll_id, std::string scroll); + static std::string build_clear_scroll_body(std::string scroll_id); + private: std::vector _fields; uint16_t _size; From b7aeaed96ea3c5e70a864373fa10637831380c81 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 14:35:10 +0800 Subject: [PATCH 14/17] Refactor Elasticsearch releated logic --- be/src/util/es_scan_reader.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/be/src/util/es_scan_reader.cpp b/be/src/util/es_scan_reader.cpp index 423e46934dde2d..412f739f57ca17 100644 --- a/be/src/util/es_scan_reader.cpp +++ b/be/src/util/es_scan_reader.cpp @@ -19,9 +19,6 @@ #include #include "es_scan_reader.h" #include "es_scroll_query.h" -#include "rapidjson/writer.h" -#include "rapidjson/document.h" -#include "rapidjson/stringbuffer.h" #include "common/logging.h" #include "common/status.h" #include From b98ce32958c72ad06de35bfaa7db45ab9fd9cca8 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 14:57:55 +0800 Subject: [PATCH 15/17] Enchance function params --- be/src/util/es_scroll_parser.cpp | 2 +- be/src/util/es_scroll_parser.h | 2 +- be/src/util/es_scroll_query.cpp | 4 ++-- be/src/util/es_scroll_query.h | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/be/src/util/es_scroll_parser.cpp b/be/src/util/es_scroll_parser.cpp index 1a12c4f48ed6e6..cf266c82de1a06 100644 --- a/be/src/util/es_scroll_parser.cpp +++ b/be/src/util/es_scroll_parser.cpp @@ -36,7 +36,7 @@ ScrollParser::~ScrollParser() { } -Status ScrollParser::parse(std::string scroll_result) { +Status ScrollParser::parse(const std::string scroll_result) { rapidjson::Document document_node; document_node.Parse<0>(scroll_result.c_str()); if (!document_node.HasMember(FIELD_SCROLL_ID)) { diff --git a/be/src/util/es_scroll_parser.h b/be/src/util/es_scroll_parser.h index 4a7a850a40bdb6..80b6a62a622b43 100644 --- a/be/src/util/es_scroll_parser.h +++ b/be/src/util/es_scroll_parser.h @@ -28,7 +28,7 @@ class ScrollParser { std::string get_scroll_id(); bool count(); uint32_t total(); - Status parse(std::string scroll_result); + Status parse(const std::string scroll_result); bool has_next(); void set_batch_size(int batch_size) { _batch_size = batch_size; diff --git a/be/src/util/es_scroll_query.cpp b/be/src/util/es_scroll_query.cpp index e29a1be7d6ab68..461bd55160380c 100644 --- a/be/src/util/es_scroll_query.cpp +++ b/be/src/util/es_scroll_query.cpp @@ -33,7 +33,7 @@ ESScrollQueryBuilder::~ESScrollQueryBuilder() { } -std::string ESScrollQueryBuilder::build_next_scroll_body(std::string scroll_id, std::string scroll) { +std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string scroll_id, std::string scroll) { rapidjson::Document scroll_dsl; rapidjson::Document::AllocatorType &allocator = scroll_dsl.GetAllocator(); scroll_dsl.SetObject(); @@ -46,7 +46,7 @@ std::string ESScrollQueryBuilder::build_next_scroll_body(std::string scroll_id, scroll_dsl.Accept(writer); return buffer.GetString(); } -std::string ESScrollQueryBuilder::build_clear_scroll_body(std::string scroll_id) { +std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string scroll_id) { rapidjson::Document delete_scroll_dsl; rapidjson::Document::AllocatorType &allocator = delete_scroll_dsl.GetAllocator(); delete_scroll_dsl.SetObject(); diff --git a/be/src/util/es_scroll_query.h b/be/src/util/es_scroll_query.h index b714d23c18d72e..58293ef51fbd90 100644 --- a/be/src/util/es_scroll_query.h +++ b/be/src/util/es_scroll_query.h @@ -38,8 +38,8 @@ class ESScrollQueryBuilder { _fields = fields; } - static std::string build_next_scroll_body(std::string scroll_id, std::string scroll); - static std::string build_clear_scroll_body(std::string scroll_id); + static std::string build_next_scroll_body(const std::string scroll_id, std::string scroll); + static std::string build_clear_scroll_body(const std::string scroll_id); private: std::vector _fields; From 647f4e141e283940fbfe3c2e428004259dcab83c Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 15:22:09 +0800 Subject: [PATCH 16/17] Enchance function params --- be/src/util/es_scan_reader.cpp | 14 +++++--------- be/src/util/es_scan_reader.h | 2 +- be/src/util/es_scroll_parser.cpp | 2 +- be/src/util/es_scroll_parser.h | 2 +- be/src/util/es_scroll_query.cpp | 4 ++-- be/src/util/es_scroll_query.h | 6 +++--- 6 files changed, 13 insertions(+), 17 deletions(-) diff --git a/be/src/util/es_scan_reader.cpp b/be/src/util/es_scan_reader.cpp index 412f739f57ca17..36005c84f4e5ed 100644 --- a/be/src/util/es_scan_reader.cpp +++ b/be/src/util/es_scan_reader.cpp @@ -56,10 +56,6 @@ ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std:: } ESScanReader::~ESScanReader() { - if (_cached_response != nullptr) { - free(_cached_response); - _cached_response = nullptr; - } } Status ESScanReader::open() { @@ -68,14 +64,14 @@ Status ESScanReader::open() { _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); // phase open, we cached the first response for `get_next` phase - _network_client.execute_post_request(_query, _cached_response); + _network_client.execute_post_request(_query, &_cached_response); long status = _network_client.get_http_status(); if (status != 200) { LOG(WARNING) << "invalid response http status for open: " << status; - return Status(*_cached_response); + return Status(_cached_response); } - VLOG(1) << "open _cached response: " << *_cached_response; - RETURN_IF_ERROR(_parser.parse(*_cached_response)); + VLOG(1) << "open _cached response: " << _cached_response; + RETURN_IF_ERROR(_parser.parse(_cached_response)); _eos = _parser.has_next(); return Status::OK; } @@ -90,7 +86,7 @@ Status ESScanReader::get_next(bool* eos, std::string* response) { } _is_first = false; *eos = _eos; - *response = *_cached_response; + *response = _cached_response; return Status::OK; } RETURN_IF_ERROR(_network_client.init(_next_scroll_url)); diff --git a/be/src/util/es_scan_reader.h b/be/src/util/es_scan_reader.h index 48e5fe6589ddac..45c413e7df3d6f 100644 --- a/be/src/util/es_scan_reader.h +++ b/be/src/util/es_scan_reader.h @@ -66,7 +66,7 @@ class ESScanReader { bool _eos; uint16_t _batch_size; - std::string *_cached_response = new std::string(); + std::string _cached_response; ScrollParser _parser; }; } diff --git a/be/src/util/es_scroll_parser.cpp b/be/src/util/es_scroll_parser.cpp index cf266c82de1a06..bd2069df98b2f0 100644 --- a/be/src/util/es_scroll_parser.cpp +++ b/be/src/util/es_scroll_parser.cpp @@ -36,7 +36,7 @@ ScrollParser::~ScrollParser() { } -Status ScrollParser::parse(const std::string scroll_result) { +Status ScrollParser::parse(const std::string& scroll_result) { rapidjson::Document document_node; document_node.Parse<0>(scroll_result.c_str()); if (!document_node.HasMember(FIELD_SCROLL_ID)) { diff --git a/be/src/util/es_scroll_parser.h b/be/src/util/es_scroll_parser.h index 80b6a62a622b43..bd9dbbbac42dff 100644 --- a/be/src/util/es_scroll_parser.h +++ b/be/src/util/es_scroll_parser.h @@ -28,7 +28,7 @@ class ScrollParser { std::string get_scroll_id(); bool count(); uint32_t total(); - Status parse(const std::string scroll_result); + Status parse(const std::string& scroll_result); bool has_next(); void set_batch_size(int batch_size) { _batch_size = batch_size; diff --git a/be/src/util/es_scroll_query.cpp b/be/src/util/es_scroll_query.cpp index 461bd55160380c..4539297881ef77 100644 --- a/be/src/util/es_scroll_query.cpp +++ b/be/src/util/es_scroll_query.cpp @@ -33,7 +33,7 @@ ESScrollQueryBuilder::~ESScrollQueryBuilder() { } -std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string scroll_id, std::string scroll) { +std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string& scroll_id, std::string& scroll) { rapidjson::Document scroll_dsl; rapidjson::Document::AllocatorType &allocator = scroll_dsl.GetAllocator(); scroll_dsl.SetObject(); @@ -46,7 +46,7 @@ std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string scrol scroll_dsl.Accept(writer); return buffer.GetString(); } -std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string scroll_id) { +std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scroll_id) { rapidjson::Document delete_scroll_dsl; rapidjson::Document::AllocatorType &allocator = delete_scroll_dsl.GetAllocator(); delete_scroll_dsl.SetObject(); diff --git a/be/src/util/es_scroll_query.h b/be/src/util/es_scroll_query.h index 58293ef51fbd90..7709c84f5a98da 100644 --- a/be/src/util/es_scroll_query.h +++ b/be/src/util/es_scroll_query.h @@ -34,12 +34,12 @@ class ESScrollQueryBuilder { void set_batch_size(uint16_t batch_size) { _size = batch_size; } - void set_selected_fields(std::vector& fields) { + void set_selected_fields(const std::vector& fields) { _fields = fields; } - static std::string build_next_scroll_body(const std::string scroll_id, std::string scroll); - static std::string build_clear_scroll_body(const std::string scroll_id); + static std::string build_next_scroll_body(const std::string& scroll_id, std::string& scroll); + static std::string build_clear_scroll_body(const std::string& scroll_id); private: std::vector _fields; From bae9519b0d535c79ed4e48665db32c1fb5b35edd Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 15:25:39 +0800 Subject: [PATCH 17/17] Enchance function params --- be/src/util/es_scroll_query.cpp | 2 +- be/src/util/es_scroll_query.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/util/es_scroll_query.cpp b/be/src/util/es_scroll_query.cpp index 4539297881ef77..57e936d2284801 100644 --- a/be/src/util/es_scroll_query.cpp +++ b/be/src/util/es_scroll_query.cpp @@ -33,7 +33,7 @@ ESScrollQueryBuilder::~ESScrollQueryBuilder() { } -std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string& scroll_id, std::string& scroll) { +std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string& scroll_id, const std::string& scroll) { rapidjson::Document scroll_dsl; rapidjson::Document::AllocatorType &allocator = scroll_dsl.GetAllocator(); scroll_dsl.SetObject(); diff --git a/be/src/util/es_scroll_query.h b/be/src/util/es_scroll_query.h index 7709c84f5a98da..766a0e09b60574 100644 --- a/be/src/util/es_scroll_query.h +++ b/be/src/util/es_scroll_query.h @@ -38,7 +38,7 @@ class ESScrollQueryBuilder { _fields = fields; } - static std::string build_next_scroll_body(const std::string& scroll_id, std::string& scroll); + static std::string build_next_scroll_body(const std::string& scroll_id, const std::string& scroll); static std::string build_clear_scroll_body(const std::string& scroll_id); private: