diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index 88c4374fe6f618..ca061cfea88a0c 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -145,12 +145,18 @@ size_t HttpClient::on_response_data(const void* data, size_t length) { // return execute(callback); // } -Status HttpClient::execute_post_request(const std::string& post_data, std::string* response) { +Status HttpClient::execute_post_request(const std::string& payload, std::string* response) { set_method(POST); - set_post_body(post_data); + set_payload(payload); return execute(response); } +Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) { + set_method(DELETE); + set_payload(payload); + return execute(response); +} + Status HttpClient::execute(const std::function& callback) { _callback = &callback; auto code = curl_easy_perform(_curl); diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index 83a27b8d63646f..d54bc680d33aaf 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -61,8 +61,7 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); } - // you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up - void set_post_body(const std::string& post_body) { + void set_payload(const std::string& post_body) { curl_easy_setopt(_curl, CURLOPT_POSTFIELDSIZE, (long)post_body.length()); curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); } @@ -114,7 +113,9 @@ class HttpClient { // a file to local_path Status download(const std::string& local_path); - Status execute_post_request(const std::string& post_data, std::string* response); + Status execute_post_request(const std::string& payload, std::string* response); + + Status execute_delete_request(const std::string& payload, std::string* response); // execute a simple method, and its response is saved in response argument Status execute(std::string* response); diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 3fa44dc6fa1059..fb38438761d165 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -74,6 +74,9 @@ add_library(Util STATIC aes_util.cpp string_util.cpp md5.cpp + es_scan_reader.cpp + es_scroll_query.cpp + es_scroll_parser.cpp ) #ADD_BE_TEST(integer-array-test) @@ -87,3 +90,4 @@ add_library(Util STATIC #ADD_BE_TEST(bit-util-test) #ADD_BE_TEST(rle-test) ##ADD_BE_TEST(perf-counters-test) +##ADD_BE_TEST(es-scan-reader-test) \ No newline at end of file diff --git a/be/src/util/es_scan_reader.cpp b/be/src/util/es_scan_reader.cpp new file mode 100644 index 00000000000000..36005c84f4e5ed --- /dev/null +++ b/be/src/util/es_scan_reader.cpp @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include "es_scan_reader.h" +#include "es_scroll_query.h" +#include "common/logging.h" +#include "common/status.h" +#include + +namespace doris { +const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields"; +const std::string REQUEST_SCROLL_PATH = "_scroll"; +const std::string REQUEST_PREFERENCE_PREFIX = "&preference=shards:"; +const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll"; +const std::string REQUEST_SEPARATOR = "/"; +const std::string REQUEST_SCROLL_TIME = "5m"; + +ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std::map& props) { + LOG(INFO) << "ESScanReader "; + _target = target; + _batch_size = size; + _index = props.at(KEY_INDEX); + _type = props.at(KEY_TYPE); + if (props.find(KEY_USER_NAME) != props.end()) { + _user_name = props.at(KEY_USER_NAME); + } + if (props.find(KEY_PASS_WORD) != props.end()){ + _passwd = props.at(KEY_PASS_WORD); + } + if (props.find(KEY_SHARDS) != props.end()) { + _shards = props.at(KEY_SHARDS); + } + if (props.find(KEY_QUERY) != props.end()) { + _query = props.at(KEY_QUERY); + } + _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH; + _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH; + _eos = false; + _parser.set_batch_size(size); +} + +ESScanReader::~ESScanReader() { +} + +Status ESScanReader::open() { + _is_first = true; + RETURN_IF_ERROR(_network_client.init(_init_scroll_url)); + _network_client.set_basic_auth(_user_name, _passwd); + _network_client.set_content_type("application/json"); + // phase open, we cached the first response for `get_next` phase + _network_client.execute_post_request(_query, &_cached_response); + long status = _network_client.get_http_status(); + if (status != 200) { + LOG(WARNING) << "invalid response http status for open: " << status; + return Status(_cached_response); + } + VLOG(1) << "open _cached response: " << _cached_response; + RETURN_IF_ERROR(_parser.parse(_cached_response)); + _eos = _parser.has_next(); + return Status::OK; +} + +Status ESScanReader::get_next(bool* eos, std::string* response) { + // if is first scroll request, should return the cached response + if (_is_first) { + // maybe the index or shard is empty + if (_eos) { + *eos = true; + return Status::OK; + } + _is_first = false; + *eos = _eos; + *response = _cached_response; + return Status::OK; + } + RETURN_IF_ERROR(_network_client.init(_next_scroll_url)); + _network_client.set_basic_auth(_user_name, _passwd); + _network_client.set_content_type("application/json"); + _network_client.set_timeout_ms(5 * 1000); + RETURN_IF_ERROR(_network_client.execute_post_request(ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, REQUEST_SCROLL_TIME), response)); + long status = _network_client.get_http_status(); + if (status == 404) { + LOG(WARNING) << "request scroll search failure 404[" + << ", response: " << (response->empty() ? "empty response" : *response); + return Status("No search context found for " + _scroll_id); + } + if (status != 200) { + LOG(WARNING) << "request scroll search failure[" + << "http status" << status + << ", response: " << (response->empty() ? "empty response" : *response); + if (status == 404) { + return Status("No search context found for " + _scroll_id); + } + return Status("request scroll search failure: " + (response->empty() ? "empty response" : *response)); + } + RETURN_IF_ERROR(_parser.parse(*response)); + *eos = _eos = _parser.has_next(); + return Status::OK; +} + +Status ESScanReader::close() { + std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH; + RETURN_IF_ERROR(_network_client.init(scratch_target)); + _network_client.set_basic_auth(_user_name, _passwd); + _network_client.set_method(DELETE); + _network_client.set_content_type("application/json"); + _network_client.set_timeout_ms(5 * 1000); + std::string response; + RETURN_IF_ERROR(_network_client.execute_delete_request(ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response)); + if (_network_client.get_http_status() == 200) { + return Status::OK; + } else { + return Status("es_scan_reader delete scroll context failure"); + } +} +} diff --git a/be/src/util/es_scan_reader.h b/be/src/util/es_scan_reader.h new file mode 100644 index 00000000000000..45c413e7df3d6f --- /dev/null +++ b/be/src/util/es_scan_reader.h @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "http/http_client.h" +#include "es_scroll_parser.h" + +using std::string; + +namespace doris { + +class Status; + +class ESScanReader { + +public: + static constexpr const char* KEY_USER_NAME = "user"; + static constexpr const char* KEY_PASS_WORD = "passwd"; + static constexpr const char* KEY_INDEX = "index"; + static constexpr const char* KEY_TYPE = "type"; + static constexpr const char* KEY_SHARDS = "shards"; + static constexpr const char* KEY_QUERY = "query"; + static constexpr const char* KEY_BATCH_SIZE = "batch_size"; + ESScanReader(const std::string& target, uint16_t size, const std::map& props); + ~ESScanReader(); + + // launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next + Status open(); + // invoke get_next to get next batch documents from elasticsearch + Status get_next(bool *eos, std::string* response); + // clear scroll context from elasticsearch + Status close(); + +private: + std::string _target; + std::string _user_name; + std::string _passwd; + std::string _scroll_id; + HttpClient _network_client; + std::string _index; + std::string _type; + // push down filter + std::string _query; + // elaticsearch shards to fetch document + std::string _shards; + // distinguish the first scroll phase and the following scroll + bool _is_first; + std::string _init_scroll_url; + std::string _next_scroll_url; + bool _eos; + uint16_t _batch_size; + + std::string _cached_response; + ScrollParser _parser; +}; +} + diff --git a/be/src/util/es_scroll_parser.cpp b/be/src/util/es_scroll_parser.cpp new file mode 100644 index 00000000000000..bd2069df98b2f0 --- /dev/null +++ b/be/src/util/es_scroll_parser.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "es_scroll_parser.h" +#include "rapidjson/document.h" +#include "common/logging.h" +#include "common/status.h" + +namespace doris { + +const char* FIELD_SCROLL_ID = "_scroll_id"; +const char* FIELD_HITS = "hits"; +const char* FIELD_INNER_HITS = "hits"; +const char* FIELD_SOURCE = "_source"; +const char* FIELD_TOTAL = "total"; + +ScrollParser::ScrollParser() { + _eos = false; + _total = 0; +} + +ScrollParser::~ScrollParser() { +} + + +Status ScrollParser::parse(const std::string& scroll_result) { + rapidjson::Document document_node; + document_node.Parse<0>(scroll_result.c_str()); + if (!document_node.HasMember(FIELD_SCROLL_ID)) { + return Status("maybe not a scroll request"); + } + rapidjson::Value &scroll_node = document_node[FIELD_SCROLL_ID]; + _scroll_id = scroll_node.GetString(); + // { hits: { total : 2, "hits" : [ {}, {}, {} ]}} + rapidjson::Value &outer_hits_node = document_node[FIELD_HITS]; + rapidjson::Value &total = document_node[FIELD_TOTAL]; + _total = total.GetInt(); + if (_total == 0) { + _eos = true; + return Status::OK; + } + VLOG(1) << "es_scan_reader total hits: " << _total << " documents"; + rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS]; + if (!inner_hits_node.IsArray()) { + return Status("invalid response from elasticsearch"); + } + _size = inner_hits_node.Size(); + if (_size < _batch_size) { + _eos = true; + } + return Status::OK; +} + +bool ScrollParser::has_next() { + return _eos; +} + +bool ScrollParser::count() { + return _size; +} + +std::string ScrollParser::get_scroll_id() { + return _scroll_id; +} +} diff --git a/be/src/util/es_scroll_parser.h b/be/src/util/es_scroll_parser.h new file mode 100644 index 00000000000000..bd9dbbbac42dff --- /dev/null +++ b/be/src/util/es_scroll_parser.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include + +namespace doris { + +class Status; +class ScrollParser { + +public: + ScrollParser(); + ~ScrollParser(); + std::string get_scroll_id(); + bool count(); + uint32_t total(); + Status parse(const std::string& scroll_result); + bool has_next(); + void set_batch_size(int batch_size) { + _batch_size = batch_size; + } + +private: + std::string _scroll_id; + bool _eos; + int _total; + int _size; + int _batch_size; +}; +} diff --git a/be/src/util/es_scroll_query.cpp b/be/src/util/es_scroll_query.cpp new file mode 100644 index 00000000000000..57e936d2284801 --- /dev/null +++ b/be/src/util/es_scroll_query.cpp @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "es_scroll_query.h" +#include +#include +#include "common/logging.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" + +namespace doris { + +ESScrollQueryBuilder::ESScrollQueryBuilder() { + +} + +ESScrollQueryBuilder::~ESScrollQueryBuilder() { + +} + +std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string& scroll_id, const std::string& scroll) { + rapidjson::Document scroll_dsl; + rapidjson::Document::AllocatorType &allocator = scroll_dsl.GetAllocator(); + scroll_dsl.SetObject(); + rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator); + scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); + rapidjson::Value scroll_value(scroll.c_str(), allocator); + scroll_dsl.AddMember("scroll", scroll_value, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + scroll_dsl.Accept(writer); + return buffer.GetString(); +} +std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scroll_id) { + rapidjson::Document delete_scroll_dsl; + rapidjson::Document::AllocatorType &allocator = delete_scroll_dsl.GetAllocator(); + delete_scroll_dsl.SetObject(); + rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator); + delete_scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + delete_scroll_dsl.Accept(writer); + return buffer.GetString(); +} + + +std::string ESScrollQueryBuilder::build() { + rapidjson::Document es_query_dsl; + rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator(); + es_query_dsl.SetObject(); + if (_fields.size() > 0) { + rapidjson::Value source_node(rapidjson::kArrayType); + for (auto iter = _fields.begin(); iter != _fields.end(); iter++) { + rapidjson::Value field(iter->c_str(), allocator); + source_node.PushBack(field, allocator); + } + es_query_dsl.AddMember("_source", source_node, allocator); + } + + rapidjson::Value sort_node(rapidjson::kArrayType); + rapidjson::Value field("_doc", allocator); + sort_node.PushBack(field, allocator); + es_query_dsl.AddMember("sort", sort_node, allocator); + + es_query_dsl.AddMember("size", _size, allocator); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + es_query_dsl.Accept(writer); + std::string es_query_dsl_json = buffer.GetString(); + return es_query_dsl_json; +} + +} diff --git a/be/src/util/es_scroll_query.h b/be/src/util/es_scroll_query.h new file mode 100644 index 00000000000000..766a0e09b60574 --- /dev/null +++ b/be/src/util/es_scroll_query.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#pragma once +#include +#include + +namespace doris { + +class ESScrollQueryBuilder { + +public: + ESScrollQueryBuilder(); + ~ESScrollQueryBuilder(); + // build the query DSL for elasticsearch + std::string build(); + + + void set_batch_size(uint16_t batch_size) { + _size = batch_size; + } + void set_selected_fields(const std::vector& fields) { + _fields = fields; + } + + static std::string build_next_scroll_body(const std::string& scroll_id, const std::string& scroll); + static std::string build_clear_scroll_body(const std::string& scroll_id); + +private: + std::vector _fields; + uint16_t _size; +}; +} diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 0ac6e774d4661f..ac87fbab2f3dfb 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -38,3 +38,4 @@ ADD_BE_TEST(uid_util_test) ADD_BE_TEST(arena_test) ADD_BE_TEST(aes_util_test) ADD_BE_TEST(md5_test) +ADD_BE_TEST(es_scan_reader_test) diff --git a/be/test/util/es_scan_reader_test.cpp b/be/test/util/es_scan_reader_test.cpp new file mode 100644 index 00000000000000..97bf654384e63c --- /dev/null +++ b/be/test/util/es_scan_reader_test.cpp @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "util/es_scan_reader.h" +#include "util/es_scroll_query.h" +#include +#include "common/logging.h" +#include "http/ev_http_server.h" +#include "http/http_channel.h" +#include "http/http_handler.h" +#include "http/http_request.h" +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" +#include +#include +#include + +namespace doris { + +class RestSearchAction : public HttpHandler { +public: + void handle(HttpRequest* req) override { + std::string user; + std::string passwd; + if (!parse_basic_auth(*req, &user, &passwd) || user != "root") { + HttpChannel::send_basic_challenge(req, "abc"); + return; + } + req->add_output_header(HttpHeaders::CONTENT_TYPE, "application/json"); + if (req->method() == HttpMethod::POST) { + std::string post_body = req->get_request_body(); + rapidjson::Document post_doc; + post_doc.Parse<0>(post_body.c_str()); + int size = 1; + if (post_doc.HasMember("size")) { + rapidjson::Value& size_value = post_doc["size"]; + size = size_value.GetInt(); + } + std::string _scroll_id(std::to_string(size)); + rapidjson::Document search_result; + rapidjson::Document::AllocatorType &allocator = search_result.GetAllocator(); + search_result.SetObject(); + rapidjson::Value scroll_id_value(_scroll_id.c_str(), allocator); + search_result.AddMember("_scroll_id", scroll_id_value, allocator); + + rapidjson::Value outer_hits(rapidjson::kObjectType); + outer_hits.AddMember("total", 10, allocator); + rapidjson::Value inner_hits(rapidjson::kArrayType); + rapidjson::Value source_docuement(rapidjson::kObjectType); + source_docuement.AddMember("id", 1, allocator); + rapidjson::Value value_node("1", allocator); + source_docuement.AddMember("value", value_node, allocator); + inner_hits.PushBack(source_docuement, allocator); + outer_hits.AddMember("hits", inner_hits, allocator); + search_result.AddMember("hits", outer_hits, allocator); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + search_result.Accept(writer); + //send DELETE scorll post request + std::string search_result_json = buffer.GetString(); + HttpChannel::send_reply(req, search_result_json); + } else { + std::string response = "test1"; + HttpChannel::send_reply(req, response); + } + } +}; + +class RestSearchScrollAction : public HttpHandler { +public: + void handle(HttpRequest* req) override { + std::string user; + std::string passwd; + if (!parse_basic_auth(*req, &user, &passwd) || user != "root") { + HttpChannel::send_basic_challenge(req, "abc"); + return; + } + if (req->method() == HttpMethod::POST) { + std::string post_body = req->get_request_body(); + rapidjson::Document post_doc; + post_doc.Parse<0>(post_body.c_str()); + int size = 1; + std::string scroll_id; + if (!post_doc.HasMember("scroll_id")) { + HttpChannel::send_reply(req,HttpStatus::NOT_FOUND, "invalid scroll request"); + return; + } else { + rapidjson::Value& scroll_id_value = post_doc["scroll_id"]; + scroll_id = scroll_id_value.GetString(); + int offset = atoi(scroll_id.c_str()); + if (offset > 10) { + rapidjson::Document end_search_result; + rapidjson::Document::AllocatorType &allocator = end_search_result.GetAllocator(); + end_search_result.SetObject(); + rapidjson::Value scroll_id_value("11", allocator); + end_search_result.AddMember("_scroll_id", scroll_id_value, allocator); + + rapidjson::Value outer_hits(rapidjson::kObjectType); + outer_hits.AddMember("total", 10, allocator); + end_search_result.AddMember("hits", outer_hits, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + end_search_result.Accept(writer); + //send DELETE scorll post request + std::string end_search_result_json = buffer.GetString(); + HttpChannel::send_reply(req, end_search_result_json); + return; + } else { + int start = offset + 1; + rapidjson::Document search_result; + rapidjson::Document::AllocatorType &allocator = search_result.GetAllocator(); + search_result.SetObject(); + rapidjson::Value scroll_id_value(std::to_string(start).c_str(), allocator); + search_result.AddMember("_scroll_id", scroll_id_value, allocator); + + rapidjson::Value outer_hits(rapidjson::kObjectType); + outer_hits.AddMember("total", 10, allocator); + rapidjson::Value inner_hits(rapidjson::kArrayType); + rapidjson::Value source_docuement(rapidjson::kObjectType); + source_docuement.AddMember("id", start, allocator); + rapidjson::Value value_node(std::to_string(start).c_str(), allocator); + source_docuement.AddMember("value", value_node, allocator); + inner_hits.PushBack(source_docuement, allocator); + outer_hits.AddMember("hits", inner_hits, allocator); + search_result.AddMember("hits", outer_hits, allocator); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + search_result.Accept(writer); + //send DELETE scorll post request + std::string search_result_json = buffer.GetString(); + HttpChannel::send_reply(req, search_result_json); + return; + } + + } + } + } +}; + +class RestClearScrollAction : public HttpHandler { +public: + void handle(HttpRequest* req) override { + std::string user; + std::string passwd; + if (!parse_basic_auth(*req, &user, &passwd) || user != "root") { + HttpChannel::send_basic_challenge(req, "abc"); + return; + } + if (req->method() == HttpMethod::DELETE) { + std::string post_body = req->get_request_body(); + rapidjson::Document post_doc; + post_doc.Parse<0>(post_body.c_str()); + int size = 1; + std::string scroll_id; + if (!post_doc.HasMember("scroll_id")) { + HttpChannel::send_reply(req,HttpStatus::NOT_FOUND, "invalid scroll request"); + return; + } else { + rapidjson::Document clear_scroll_result; + rapidjson::Document::AllocatorType &allocator = clear_scroll_result.GetAllocator(); + clear_scroll_result.SetObject(); + clear_scroll_result.AddMember("succeeded", true, allocator); + clear_scroll_result.AddMember("num_freed", 1, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + clear_scroll_result.Accept(writer); + std::string clear_scroll_result_json = buffer.GetString(); + HttpChannel::send_reply(req, clear_scroll_result_json); + return; + } + } + } +}; + +static RestSearchAction rest_search_action = RestSearchAction(); +static RestSearchScrollAction rest_search_scroll_action = RestSearchScrollAction(); +static RestClearScrollAction rest_clear_scroll_action = RestClearScrollAction(); +static EvHttpServer* mock_es_server = nullptr; + +class MockESServerTest : public testing::Test { +public: + MockESServerTest() { } + ~MockESServerTest() override { } + + static void SetUpTestCase() { + mock_es_server = new EvHttpServer(29386); + mock_es_server->register_handler(POST, "/{index}/{type}/_search", &rest_search_action); + mock_es_server->register_handler(POST, "/_search/scroll", &rest_search_scroll_action); + mock_es_server->register_handler(DELETE, "/_search/scroll", &rest_clear_scroll_action); + mock_es_server->start(); + } + + static void TearDownTestCase() { + delete mock_es_server; + } +}; + +TEST_F(MockESServerTest, workflow) { + std::string target = "http://127.0.0.1:29386"; + ESScrollQueryBuilder scroll_query_builder; + scroll_query_builder.set_batch_size(1); + std::vector fields = {"id", "value"}; + scroll_query_builder.set_selected_fields(fields); + std::map props; + props[ESScanReader::KEY_INDEX] = "tindex"; + props[ESScanReader::KEY_TYPE] = "doc"; + props[ESScanReader::KEY_USER_NAME] = "root"; + props[ESScanReader::KEY_PASS_WORD] = "root"; + props[ESScanReader::KEY_SHARDS] = "0"; + props[ESScanReader::KEY_QUERY] = scroll_query_builder.build(); + ESScanReader reader(target, 1, props); + auto st = reader.open(); + // ASSERT_TRUE(st.ok()); + bool eos = false; + while(!eos){ + std::string response; + st = reader.get_next(&eos, &response); + if(eos) { + break; + } + rapidjson::Document docuemnt_node; + docuemnt_node.Parse<0>(response.c_str()); + rapidjson::Value &scroll_node = docuemnt_node["_scroll_id"]; + std::string _scroll_id = scroll_node.GetString(); + int id = atoi(_scroll_id.c_str()); + rapidjson::Value &outer_hits_node = docuemnt_node["hits"]; + rapidjson::Value &inner_hits_node = outer_hits_node["hits"]; + rapidjson::Value &source_node = inner_hits_node[0]; + rapidjson::Value &id_node = source_node["id"]; + rapidjson::Value &value_node = source_node["value"]; + ASSERT_EQ(id, id_node.GetInt()); + std::string value = value_node.GetString(); + ASSERT_EQ(id, atoi(value.c_str())); + ASSERT_TRUE(st.ok()); + } + auto cst = reader.close(); + ASSERT_TRUE(cst.ok()); +} +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}