Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions be/src/util/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
const std::string REQUEST_SEPARATOR = "/";
const std::string REQUEST_SCROLL_TIME = "5m";

ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std::map<std::string, std::string>& props) {
LOG(INFO) << "ESScanReader ";
ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props) {
_target = target;
_batch_size = size;
_index = props.at(KEY_INDEX);
_type = props.at(KEY_TYPE);
if (props.find(KEY_USER_NAME) != props.end()) {
Expand All @@ -43,16 +41,18 @@ ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std::
if (props.find(KEY_PASS_WORD) != props.end()){
_passwd = props.at(KEY_PASS_WORD);
}
if (props.find(KEY_SHARDS) != props.end()) {
_shards = props.at(KEY_SHARDS);
if (props.find(KEY_SHARD) != props.end()) {
_shards = props.at(KEY_SHARD);
}
if (props.find(KEY_QUERY) != props.end()) {
_query = props.at(KEY_QUERY);
}
std::string batch_size_str = props.at(KEY_BATCH_SIZE);
_batch_size = atoi(batch_size_str.c_str());
_init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
_eos = false;
_parser.set_batch_size(size);
_parser.set_batch_size(_batch_size);
}

ESScanReader::~ESScanReader() {
Expand Down
7 changes: 4 additions & 3 deletions be/src/util/es_scan_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ class ESScanReader {
public:
static constexpr const char* KEY_USER_NAME = "user";
static constexpr const char* KEY_PASS_WORD = "passwd";
static constexpr const char* KEY_HOST_PORT = "host_port";
static constexpr const char* KEY_INDEX = "index";
static constexpr const char* KEY_TYPE = "type";
static constexpr const char* KEY_SHARDS = "shards";
static constexpr const char* KEY_SHARD = "shard_id";
static constexpr const char* KEY_QUERY = "query";
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
ESScanReader(const std::string& target, uint16_t size, const std::map<std::string, std::string>& props);
ESScanReader(const std::string& target, const std::map<std::string, std::string>& props);
~ESScanReader();

// launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next
Expand All @@ -64,7 +65,7 @@ class ESScanReader {
std::string _init_scroll_url;
std::string _next_scroll_url;
bool _eos;
uint16_t _batch_size;
int _batch_size;

std::string _cached_response;
ScrollParser _parser;
Expand Down
44 changes: 36 additions & 8 deletions be/src/util/es_scroll_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"

#include "util/es_scan_reader.h"
namespace doris {

ESScrollQueryBuilder::ESScrollQueryBuilder() {
Expand Down Expand Up @@ -58,32 +58,60 @@ std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scr
return buffer.GetString();
}


std::string ESScrollQueryBuilder::build() {
std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>& properties,
const std::vector<std::string>& fields,
std::vector<std::shared_ptr<EsPredicate>> predicates) {
rapidjson::Document es_query_dsl;
rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator();
es_query_dsl.SetObject();
if (_fields.size() > 0) {
if (fields.size() > 0) {
rapidjson::Value source_node(rapidjson::kArrayType);
for (auto iter = _fields.begin(); iter != _fields.end(); iter++) {
for (auto iter = fields.begin(); iter != fields.end(); iter++) {
rapidjson::Value field(iter->c_str(), allocator);
source_node.PushBack(field, allocator);
}
es_query_dsl.AddMember("_source", source_node, allocator);
}

int size = atoi(properties.at(ESScanReader::BATCH_SIZE).c_str());
rapidjson::Value sort_node(rapidjson::kArrayType);
rapidjson::Value field("_doc", allocator);
sort_node.PushBack(field, allocator);
es_query_dsl.AddMember("sort", sort_node, allocator);

es_query_dsl.AddMember("size", _size, allocator);
es_query_dsl.AddMember("size", size, allocator);

rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
es_query_dsl.Accept(writer);
std::string es_query_dsl_json = buffer.GetString();
return es_query_dsl_json;
return es_query_dsl_json;

}
// std::string ESScrollQueryBuilder::build() {
// rapidjson::Document es_query_dsl;
// rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator();
// es_query_dsl.SetObject();
// if (_fields.size() > 0) {
// rapidjson::Value source_node(rapidjson::kArrayType);
// for (auto iter = _fields.begin(); iter != _fields.end(); iter++) {
// rapidjson::Value field(iter->c_str(), allocator);
// source_node.PushBack(field, allocator);
// }
// es_query_dsl.AddMember("_source", source_node, allocator);
// }

// rapidjson::Value sort_node(rapidjson::kArrayType);
// rapidjson::Value field("_doc", allocator);
// sort_node.PushBack(field, allocator);
// es_query_dsl.AddMember("sort", sort_node, allocator);

// es_query_dsl.AddMember("size", _size, allocator);

// rapidjson::StringBuffer buffer;
// rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
// es_query_dsl.Accept(writer);
// std::string es_query_dsl_json = buffer.GetString();
// return es_query_dsl_json;
// }

}
26 changes: 15 additions & 11 deletions be/src/util/es_scroll_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#pragma once
#include<string>
#include<vector>
#include "exec/es_predicate.h"

namespace doris {

Expand All @@ -28,21 +29,24 @@ class ESScrollQueryBuilder {
ESScrollQueryBuilder();
~ESScrollQueryBuilder();
// build the query DSL for elasticsearch
std::string build();
// std::string build();


void set_batch_size(uint16_t batch_size) {
_size = batch_size;
}
void set_selected_fields(const std::vector<std::string>& fields) {
_fields = fields;
}
// void set_batch_size(uint16_t batch_size) {
// _size = batch_size;
// }
// void set_selected_fields(const std::vector<std::string>& fields) {
// _fields = fields;
// }

static std::string build_next_scroll_body(const std::string& scroll_id, const std::string& scroll);
static std::string build_clear_scroll_body(const std::string& scroll_id);

private:
std::vector<std::string> _fields;
uint16_t _size;
static std::string build(const std::map<std::string, std::string>& properties,
const std::vector<std::string>& fields,
std::vector<std::shared_ptr<EsPredicate>>);
// private:
// std::vector<std::string> _fields;
// uint16_t _size;
// };
};
}
11 changes: 5 additions & 6 deletions be/test/util/es_scan_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,17 @@ class MockESServerTest : public testing::Test {

TEST_F(MockESServerTest, workflow) {
std::string target = "http://127.0.0.1:29386";
ESScrollQueryBuilder scroll_query_builder;
scroll_query_builder.set_batch_size(1);
std::vector<std::string> fields = {"id", "value"};
scroll_query_builder.set_selected_fields(fields);
std::map<std::string, std::string> props;
props[ESScanReader::KEY_INDEX] = "tindex";
props[ESScanReader::KEY_TYPE] = "doc";
props[ESScanReader::KEY_USER_NAME] = "root";
props[ESScanReader::KEY_PASS_WORD] = "root";
props[ESScanReader::KEY_SHARDS] = "0";
props[ESScanReader::KEY_QUERY] = scroll_query_builder.build();
ESScanReader reader(target, 1, props);
props[ESScanReader::KEY_SHARD] = "0";
props[ESScanReader::KEY_BATCH_SIZE] = "1";
std::vector<std::shared_ptr<EsPredicate>> predicates;
props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates);
ESScanReader reader(target, props);
auto st = reader.open();
// ASSERT_TRUE(st.ok());
bool eos = false;
Expand Down