From d36f1dc75641cb0aeac437922dbd966fbd9f7217 Mon Sep 17 00:00:00 2001 From: wuyunfeng Date: Mon, 25 Mar 2019 19:26:16 +0800 Subject: [PATCH] Moidfy scan interface --- be/src/util/es_scan_reader.cpp | 12 ++++---- be/src/util/es_scan_reader.h | 7 +++-- be/src/util/es_scroll_query.cpp | 44 +++++++++++++++++++++++----- be/src/util/es_scroll_query.h | 26 +++++++++------- be/test/util/es_scan_reader_test.cpp | 11 ++++--- 5 files changed, 66 insertions(+), 34 deletions(-) diff --git a/be/src/util/es_scan_reader.cpp b/be/src/util/es_scan_reader.cpp index 36005c84f4e5ed..097ea5d865cebb 100644 --- a/be/src/util/es_scan_reader.cpp +++ b/be/src/util/es_scan_reader.cpp @@ -31,10 +31,8 @@ const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll"; const std::string REQUEST_SEPARATOR = "/"; const std::string REQUEST_SCROLL_TIME = "5m"; -ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std::map& props) { - LOG(INFO) << "ESScanReader "; +ESScanReader::ESScanReader(const std::string& target, const std::map& props) { _target = target; - _batch_size = size; _index = props.at(KEY_INDEX); _type = props.at(KEY_TYPE); if (props.find(KEY_USER_NAME) != props.end()) { @@ -43,16 +41,18 @@ ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std:: if (props.find(KEY_PASS_WORD) != props.end()){ _passwd = props.at(KEY_PASS_WORD); } - if (props.find(KEY_SHARDS) != props.end()) { - _shards = props.at(KEY_SHARDS); + if (props.find(KEY_SHARD) != props.end()) { + _shards = props.at(KEY_SHARD); } if (props.find(KEY_QUERY) != props.end()) { _query = props.at(KEY_QUERY); } + std::string batch_size_str = props.at(KEY_BATCH_SIZE); + _batch_size = atoi(batch_size_str.c_str()); _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH; _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH; _eos = false; - _parser.set_batch_size(size); + _parser.set_batch_size(_batch_size); } ESScanReader::~ESScanReader() { diff --git a/be/src/util/es_scan_reader.h b/be/src/util/es_scan_reader.h index 45c413e7df3d6f..8b2d13776635af 100644 --- a/be/src/util/es_scan_reader.h +++ b/be/src/util/es_scan_reader.h @@ -32,12 +32,13 @@ class ESScanReader { public: static constexpr const char* KEY_USER_NAME = "user"; static constexpr const char* KEY_PASS_WORD = "passwd"; + static constexpr const char* KEY_HOST_PORT = "host_port"; static constexpr const char* KEY_INDEX = "index"; static constexpr const char* KEY_TYPE = "type"; - static constexpr const char* KEY_SHARDS = "shards"; + static constexpr const char* KEY_SHARD = "shard_id"; static constexpr const char* KEY_QUERY = "query"; static constexpr const char* KEY_BATCH_SIZE = "batch_size"; - ESScanReader(const std::string& target, uint16_t size, const std::map& props); + ESScanReader(const std::string& target, const std::map& props); ~ESScanReader(); // launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next @@ -64,7 +65,7 @@ class ESScanReader { std::string _init_scroll_url; std::string _next_scroll_url; bool _eos; - uint16_t _batch_size; + int _batch_size; std::string _cached_response; ScrollParser _parser; diff --git a/be/src/util/es_scroll_query.cpp b/be/src/util/es_scroll_query.cpp index 57e936d2284801..2948ec31881148 100644 --- a/be/src/util/es_scroll_query.cpp +++ b/be/src/util/es_scroll_query.cpp @@ -22,7 +22,7 @@ #include "rapidjson/document.h" #include "rapidjson/stringbuffer.h" #include "rapidjson/writer.h" - +#include "util/es_scan_reader.h" namespace doris { ESScrollQueryBuilder::ESScrollQueryBuilder() { @@ -58,32 +58,60 @@ std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scr return buffer.GetString(); } - -std::string ESScrollQueryBuilder::build() { +std::string ESScrollQueryBuilder::build(const std::map& properties, + const std::vector& fields, + std::vector> predicates) { rapidjson::Document es_query_dsl; rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator(); es_query_dsl.SetObject(); - if (_fields.size() > 0) { + if (fields.size() > 0) { rapidjson::Value source_node(rapidjson::kArrayType); - for (auto iter = _fields.begin(); iter != _fields.end(); iter++) { + for (auto iter = fields.begin(); iter != fields.end(); iter++) { rapidjson::Value field(iter->c_str(), allocator); source_node.PushBack(field, allocator); } es_query_dsl.AddMember("_source", source_node, allocator); } - + int size = atoi(properties.at(ESScanReader::BATCH_SIZE).c_str()); rapidjson::Value sort_node(rapidjson::kArrayType); rapidjson::Value field("_doc", allocator); sort_node.PushBack(field, allocator); es_query_dsl.AddMember("sort", sort_node, allocator); - es_query_dsl.AddMember("size", _size, allocator); + es_query_dsl.AddMember("size", size, allocator); rapidjson::StringBuffer buffer; rapidjson::Writer writer(buffer); es_query_dsl.Accept(writer); std::string es_query_dsl_json = buffer.GetString(); - return es_query_dsl_json; + return es_query_dsl_json; + } +// std::string ESScrollQueryBuilder::build() { +// rapidjson::Document es_query_dsl; +// rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator(); +// es_query_dsl.SetObject(); +// if (_fields.size() > 0) { +// rapidjson::Value source_node(rapidjson::kArrayType); +// for (auto iter = _fields.begin(); iter != _fields.end(); iter++) { +// rapidjson::Value field(iter->c_str(), allocator); +// source_node.PushBack(field, allocator); +// } +// es_query_dsl.AddMember("_source", source_node, allocator); +// } + +// rapidjson::Value sort_node(rapidjson::kArrayType); +// rapidjson::Value field("_doc", allocator); +// sort_node.PushBack(field, allocator); +// es_query_dsl.AddMember("sort", sort_node, allocator); + +// es_query_dsl.AddMember("size", _size, allocator); + +// rapidjson::StringBuffer buffer; +// rapidjson::Writer writer(buffer); +// es_query_dsl.Accept(writer); +// std::string es_query_dsl_json = buffer.GetString(); +// return es_query_dsl_json; +// } } diff --git a/be/src/util/es_scroll_query.h b/be/src/util/es_scroll_query.h index 766a0e09b60574..f30378d30c3b1e 100644 --- a/be/src/util/es_scroll_query.h +++ b/be/src/util/es_scroll_query.h @@ -19,6 +19,7 @@ #pragma once #include #include +#include "exec/es_predicate.h" namespace doris { @@ -28,21 +29,24 @@ class ESScrollQueryBuilder { ESScrollQueryBuilder(); ~ESScrollQueryBuilder(); // build the query DSL for elasticsearch - std::string build(); + // std::string build(); - void set_batch_size(uint16_t batch_size) { - _size = batch_size; - } - void set_selected_fields(const std::vector& fields) { - _fields = fields; - } + // void set_batch_size(uint16_t batch_size) { + // _size = batch_size; + // } + // void set_selected_fields(const std::vector& fields) { + // _fields = fields; + // } static std::string build_next_scroll_body(const std::string& scroll_id, const std::string& scroll); static std::string build_clear_scroll_body(const std::string& scroll_id); - -private: - std::vector _fields; - uint16_t _size; + static std::string build(const std::map& properties, + const std::vector& fields, + std::vector>); +// private: +// std::vector _fields; +// uint16_t _size; +// }; }; } diff --git a/be/test/util/es_scan_reader_test.cpp b/be/test/util/es_scan_reader_test.cpp index 97bf654384e63c..7b1df013a19ddd 100644 --- a/be/test/util/es_scan_reader_test.cpp +++ b/be/test/util/es_scan_reader_test.cpp @@ -213,18 +213,17 @@ class MockESServerTest : public testing::Test { TEST_F(MockESServerTest, workflow) { std::string target = "http://127.0.0.1:29386"; - ESScrollQueryBuilder scroll_query_builder; - scroll_query_builder.set_batch_size(1); std::vector fields = {"id", "value"}; - scroll_query_builder.set_selected_fields(fields); std::map props; props[ESScanReader::KEY_INDEX] = "tindex"; props[ESScanReader::KEY_TYPE] = "doc"; props[ESScanReader::KEY_USER_NAME] = "root"; props[ESScanReader::KEY_PASS_WORD] = "root"; - props[ESScanReader::KEY_SHARDS] = "0"; - props[ESScanReader::KEY_QUERY] = scroll_query_builder.build(); - ESScanReader reader(target, 1, props); + props[ESScanReader::KEY_SHARD] = "0"; + props[ESScanReader::KEY_BATCH_SIZE] = "1"; + std::vector> predicates; + props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates); + ESScanReader reader(target, props); auto st = reader.open(); // ASSERT_TRUE(st.ok()); bool eos = false;