Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,18 @@ size_t HttpClient::on_response_data(const void* data, size_t length) {
// return execute(callback);
// }

Status HttpClient::execute_post_request(const std::string& post_data, std::string* response) {
Status HttpClient::execute_post_request(const std::string& payload, std::string* response) {
set_method(POST);
set_post_body(post_data);
set_payload(payload);
return execute(response);
}

Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) {
set_method(DELETE);
set_payload(payload);
return execute(response);
}

Status HttpClient::execute(const std::function<bool(const void* data, size_t length)>& callback) {
_callback = &callback;
auto code = curl_easy_perform(_curl);
Expand Down
7 changes: 4 additions & 3 deletions be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class HttpClient {
curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list);
}

// you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up
void set_post_body(const std::string& post_body) {
void set_payload(const std::string& post_body) {
curl_easy_setopt(_curl, CURLOPT_POSTFIELDSIZE, (long)post_body.length());
curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str());
}
Expand Down Expand Up @@ -114,7 +113,9 @@ class HttpClient {
// a file to local_path
Status download(const std::string& local_path);

Status execute_post_request(const std::string& post_data, std::string* response);
Status execute_post_request(const std::string& payload, std::string* response);

Status execute_delete_request(const std::string& payload, std::string* response);

// execute a simple method, and its response is saved in response argument
Status execute(std::string* response);
Expand Down
4 changes: 4 additions & 0 deletions be/src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ add_library(Util STATIC
aes_util.cpp
string_util.cpp
md5.cpp
es_scan_reader.cpp
es_scroll_query.cpp
es_scroll_parser.cpp
)

#ADD_BE_TEST(integer-array-test)
Expand All @@ -87,3 +90,4 @@ add_library(Util STATIC
#ADD_BE_TEST(bit-util-test)
#ADD_BE_TEST(rle-test)
##ADD_BE_TEST(perf-counters-test)
##ADD_BE_TEST(es-scan-reader-test)
132 changes: 132 additions & 0 deletions be/src/util/es_scan_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <string>
#include <sstream>
#include "es_scan_reader.h"
#include "es_scroll_query.h"
#include "common/logging.h"
#include "common/status.h"
#include <map>

namespace doris {
const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields";
const std::string REQUEST_SCROLL_PATH = "_scroll";
const std::string REQUEST_PREFERENCE_PREFIX = "&preference=shards:";
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
const std::string REQUEST_SEPARATOR = "/";
const std::string REQUEST_SCROLL_TIME = "5m";

ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std::map<std::string, std::string>& props) {
LOG(INFO) << "ESScanReader ";
_target = target;
_batch_size = size;
_index = props.at(KEY_INDEX);
_type = props.at(KEY_TYPE);
if (props.find(KEY_USER_NAME) != props.end()) {
_user_name = props.at(KEY_USER_NAME);
}
if (props.find(KEY_PASS_WORD) != props.end()){
_passwd = props.at(KEY_PASS_WORD);
}
if (props.find(KEY_SHARDS) != props.end()) {
_shards = props.at(KEY_SHARDS);
}
if (props.find(KEY_QUERY) != props.end()) {
_query = props.at(KEY_QUERY);
}
_init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
_eos = false;
_parser.set_batch_size(size);
}

ESScanReader::~ESScanReader() {
}

Status ESScanReader::open() {
_is_first = true;
RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
// phase open, we cached the first response for `get_next` phase
_network_client.execute_post_request(_query, &_cached_response);
long status = _network_client.get_http_status();
if (status != 200) {
LOG(WARNING) << "invalid response http status for open: " << status;
return Status(_cached_response);
}
VLOG(1) << "open _cached response: " << _cached_response;
RETURN_IF_ERROR(_parser.parse(_cached_response));
_eos = _parser.has_next();
return Status::OK;
}

Status ESScanReader::get_next(bool* eos, std::string* response) {
// if is first scroll request, should return the cached response
if (_is_first) {
// maybe the index or shard is empty
if (_eos) {
*eos = true;
return Status::OK;
}
_is_first = false;
*eos = _eos;
*response = _cached_response;
return Status::OK;
}
RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(5 * 1000);
RETURN_IF_ERROR(_network_client.execute_post_request(ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, REQUEST_SCROLL_TIME), response));
long status = _network_client.get_http_status();
if (status == 404) {
LOG(WARNING) << "request scroll search failure 404["
<< ", response: " << (response->empty() ? "empty response" : *response);
return Status("No search context found for " + _scroll_id);
}
if (status != 200) {
LOG(WARNING) << "request scroll search failure["
<< "http status" << status
<< ", response: " << (response->empty() ? "empty response" : *response);
if (status == 404) {
return Status("No search context found for " + _scroll_id);
}
return Status("request scroll search failure: " + (response->empty() ? "empty response" : *response));
}
RETURN_IF_ERROR(_parser.parse(*response));
*eos = _eos = _parser.has_next();
return Status::OK;
}

Status ESScanReader::close() {
std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH;
RETURN_IF_ERROR(_network_client.init(scratch_target));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_method(DELETE);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(5 * 1000);
std::string response;
RETURN_IF_ERROR(_network_client.execute_delete_request(ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response));
if (_network_client.get_http_status() == 200) {
return Status::OK;
} else {
return Status("es_scan_reader delete scroll context failure");
}
}
}
73 changes: 73 additions & 0 deletions be/src/util/es_scan_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>
#include "http/http_client.h"
#include "es_scroll_parser.h"

using std::string;

namespace doris {

class Status;

class ESScanReader {

public:
static constexpr const char* KEY_USER_NAME = "user";
static constexpr const char* KEY_PASS_WORD = "passwd";
static constexpr const char* KEY_INDEX = "index";
static constexpr const char* KEY_TYPE = "type";
static constexpr const char* KEY_SHARDS = "shards";
static constexpr const char* KEY_QUERY = "query";
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
ESScanReader(const std::string& target, uint16_t size, const std::map<std::string, std::string>& props);
~ESScanReader();

// launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next
Status open();
// invoke get_next to get next batch documents from elasticsearch
Status get_next(bool *eos, std::string* response);
// clear scroll context from elasticsearch
Status close();

private:
std::string _target;
std::string _user_name;
std::string _passwd;
std::string _scroll_id;
HttpClient _network_client;
std::string _index;
std::string _type;
// push down filter
std::string _query;
// elaticsearch shards to fetch document
std::string _shards;
// distinguish the first scroll phase and the following scroll
bool _is_first;
std::string _init_scroll_url;
std::string _next_scroll_url;
bool _eos;
uint16_t _batch_size;

std::string _cached_response;
ScrollParser _parser;
};
}

78 changes: 78 additions & 0 deletions be/src/util/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "es_scroll_parser.h"
#include "rapidjson/document.h"
#include "common/logging.h"
#include "common/status.h"

namespace doris {

const char* FIELD_SCROLL_ID = "_scroll_id";
const char* FIELD_HITS = "hits";
const char* FIELD_INNER_HITS = "hits";
const char* FIELD_SOURCE = "_source";
const char* FIELD_TOTAL = "total";

ScrollParser::ScrollParser() {
_eos = false;
_total = 0;
}

ScrollParser::~ScrollParser() {
}


Status ScrollParser::parse(const std::string& scroll_result) {
rapidjson::Document document_node;
document_node.Parse<0>(scroll_result.c_str());
if (!document_node.HasMember(FIELD_SCROLL_ID)) {
return Status("maybe not a scroll request");
}
rapidjson::Value &scroll_node = document_node[FIELD_SCROLL_ID];
_scroll_id = scroll_node.GetString();
// { hits: { total : 2, "hits" : [ {}, {}, {} ]}}
rapidjson::Value &outer_hits_node = document_node[FIELD_HITS];
rapidjson::Value &total = document_node[FIELD_TOTAL];
_total = total.GetInt();
if (_total == 0) {
_eos = true;
return Status::OK;
}
VLOG(1) << "es_scan_reader total hits: " << _total << " documents";
rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS];
if (!inner_hits_node.IsArray()) {
return Status("invalid response from elasticsearch");
}
_size = inner_hits_node.Size();
if (_size < _batch_size) {
_eos = true;
}
return Status::OK;
}

bool ScrollParser::has_next() {
return _eos;
}

bool ScrollParser::count() {
return _size;
}

std::string ScrollParser::get_scroll_id() {
return _scroll_id;
}
}
44 changes: 44 additions & 0 deletions be/src/util/es_scroll_parser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include<string>

namespace doris {

class Status;
class ScrollParser {

public:
ScrollParser();
~ScrollParser();
std::string get_scroll_id();
bool count();
uint32_t total();
Status parse(const std::string& scroll_result);
bool has_next();
void set_batch_size(int batch_size) {
_batch_size = batch_size;
}

private:
std::string _scroll_id;
bool _eos;
int _total;
int _size;
int _batch_size;
};
}
Loading