Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,12 @@ DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
// Max size(bytes) of group commit queues, used for mem back pressure.
DEFINE_Int32(group_commit_max_queue_size, "65536");

// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");

// Download binlog rate limit, unit is KB/s, 0 means no limit
DEFINE_Int32(download_binlog_rate_limit_kbs, "0");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,12 @@ DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);

// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);

// Download binlog rate limit, unit is KB/s
DECLARE_Int32(download_binlog_rate_limit_kbs);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
29 changes: 21 additions & 8 deletions be/src/http/action/download_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,20 @@
#include "runtime/exec_env.h"

namespace doris {

const std::string FILE_PARAMETER = "file";
const std::string TOKEN_PARAMETER = "token";

DownloadAction::DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& allow_dirs,
int32_t num_workers)
: _exec_env(exec_env), _download_type(NORMAL), _num_workers(num_workers) {
namespace {
static const std::string FILE_PARAMETER = "file";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'FILE_PARAMETER' is a static definition in anonymous namespace; static is redundant here [readability-static-definition-in-anonymous-namespace]

Suggested change
static const std::string FILE_PARAMETER = "file";
const std::string FILE_PARAMETER = "file";

static const std::string TOKEN_PARAMETER = "token";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'TOKEN_PARAMETER' is a static definition in anonymous namespace; static is redundant here [readability-static-definition-in-anonymous-namespace]

Suggested change
static const std::string TOKEN_PARAMETER = "token";
const std::string TOKEN_PARAMETER = "token";

static const std::string CHANNEL_PARAMETER = "channel";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'CHANNEL_PARAMETER' is a static definition in anonymous namespace; static is redundant here [readability-static-definition-in-anonymous-namespace]

Suggested change
static const std::string CHANNEL_PARAMETER = "channel";
const std::string CHANNEL_PARAMETER = "channel";

static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'CHANNEL_INGEST_BINLOG_TYPE' is a static definition in anonymous namespace; static is redundant here [readability-static-definition-in-anonymous-namespace]

Suggested change
static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";

} // namespace

DownloadAction::DownloadAction(ExecEnv* exec_env,
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group,
const std::vector<std::string>& allow_dirs, int32_t num_workers)
: _exec_env(exec_env),
_download_type(NORMAL),
_num_workers(num_workers),
_rate_limit_group(std::move(rate_limit_group)) {
for (auto& dir : allow_dirs) {
std::string p;
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
Expand Down Expand Up @@ -107,7 +114,13 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par
if (is_dir) {
do_dir_response(file_param, req);
} else {
do_file_response(file_param, req);
const auto& channel = req->param(CHANNEL_PARAMETER);
bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
if (ingest_binlog) {
do_file_response(file_param, req, _rate_limit_group.get());
} else {
do_file_response(file_param, req);
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions be/src/http/action/download_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "http/http_handler.h"
#include "util/threadpool.h"

struct bufferevent_rate_limit_group;

namespace doris {

class ExecEnv;
Expand All @@ -36,8 +38,9 @@ class HttpRequest;
// We use parameter named 'file' to specify the static resource path, it is an absolute path.
class DownloadAction : public HttpHandler {
public:
DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& allow_dirs,
int32_t num_workers = 0);
DownloadAction(ExecEnv* exec_env,
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group,
const std::vector<std::string>& allow_dirs, int32_t num_workers = 0);

// for load error
DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir);
Expand Down Expand Up @@ -67,6 +70,8 @@ class DownloadAction : public HttpHandler {
std::string _error_log_root_dir;
int32_t _num_workers;
std::unique_ptr<ThreadPool> _download_workers;

std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
}; // end class DownloadAction

} // end namespace doris
12 changes: 8 additions & 4 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <fmt/ranges.h>

#include <cstdint>
#include <limits>
#include <stdexcept>
#include <string_view>
#include <utility>
#include <vector>

#include "common/config.h"
Expand Down Expand Up @@ -96,7 +98,7 @@ void handle_get_binlog_info(HttpRequest* req) {
}

/// handle get segment file, need tablet_id, rowset_id && index
void handle_get_segment_file(HttpRequest* req) {
void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_file_path;
try {
Expand Down Expand Up @@ -125,7 +127,7 @@ void handle_get_segment_file(HttpRequest* req) {
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
return;
}
do_file_response(segment_file_path, req);
do_file_response(segment_file_path, req, rate_limit_group);
}

void handle_get_rowset_meta(HttpRequest* req) {
Expand All @@ -149,7 +151,9 @@ void handle_get_rowset_meta(HttpRequest* req) {

} // namespace

DownloadBinlogAction::DownloadBinlogAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
DownloadBinlogAction::DownloadBinlogAction(
ExecEnv* exec_env, std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group)
: _exec_env(exec_env), _rate_limit_group(std::move(rate_limit_group)) {}

void DownloadBinlogAction::handle(HttpRequest* req) {
VLOG_CRITICAL << "accept one download binlog request " << req->debug_string();
Expand Down Expand Up @@ -178,7 +182,7 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
if (method == "get_binlog_info") {
handle_get_binlog_info(req);
} else if (method == "get_segment_file") {
handle_get_segment_file(req);
handle_get_segment_file(req, _rate_limit_group.get());
} else if (method == "get_rowset_meta") {
handle_get_rowset_meta(req);
} else {
Expand Down
7 changes: 6 additions & 1 deletion be/src/http/action/download_binlog_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@

#pragma once

#include <memory>
#include <string>
#include <vector>

#include "common/status.h"
#include "http/http_handler.h"

struct bufferevent_rate_limit_group;

namespace doris {

class ExecEnv;
class HttpRequest;

class DownloadBinlogAction : public HttpHandler {
public:
DownloadBinlogAction(ExecEnv* exec_env);
DownloadBinlogAction(ExecEnv* exec_env,
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group);
virtual ~DownloadBinlogAction() = default;

void handle(HttpRequest* req) override;
Expand All @@ -40,6 +44,7 @@ class DownloadBinlogAction : public HttpHandler {

private:
ExecEnv* _exec_env;
std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
};

} // namespace doris
58 changes: 31 additions & 27 deletions be/src/http/ev_http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,17 @@ static int on_connection(struct evhttp_request* req, void* param) {
EvHttpServer::EvHttpServer(int port, int num_workers)
: _port(port), _num_workers(num_workers), _real_port(0) {
_host = BackendOptions::get_service_bind_address();

evthread_use_pthreads();
DCHECK_GT(_num_workers, 0);
_event_bases.resize(_num_workers);
for (int i = 0; i < _num_workers; ++i) {
std::shared_ptr<event_base> base(event_base_new(),
[](event_base* base) { event_base_free(base); });
CHECK(base != nullptr) << "Couldn't create an event_base.";
std::lock_guard lock(_event_bases_lock);
_event_bases[i] = base;
}
}

EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
Expand All @@ -107,34 +117,28 @@ void EvHttpServer::start() {
.set_min_threads(_num_workers)
.set_max_threads(_num_workers)
.build(&_workers));

evthread_use_pthreads();
_event_bases.resize(_num_workers);
for (int i = 0; i < _num_workers; ++i) {
CHECK(_workers->submit_func([this, i]() {
std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) {
event_base_free(base);
});
CHECK(base != nullptr) << "Couldn't create an event_base.";
{
std::lock_guard<std::mutex> lock(_event_bases_lock);
_event_bases[i] = base;
}

/* Create a new evhttp object to handle requests. */
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
[](evhttp* http) { evhttp_free(http); });
CHECK(http != nullptr) << "Couldn't create an evhttp.";

auto res = evhttp_accept_socket(http.get(), _server_fd);
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;

evhttp_set_newreqcb(http.get(), on_connection, this);
evhttp_set_gencb(http.get(), on_request, this);

event_base_dispatch(base.get());
})
.ok());
auto status = _workers->submit_func([this, i]() {
std::shared_ptr<event_base> base;
{
std::lock_guard lock(_event_bases_lock);
base = _event_bases[i];
}

/* Create a new evhttp object to handle requests. */
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
[](evhttp* http) { evhttp_free(http); });
CHECK(http != nullptr) << "Couldn't create an evhttp.";

auto res = evhttp_accept_socket(http.get(), _server_fd);
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;

evhttp_set_newreqcb(http.get(), on_connection, this);
evhttp_set_gencb(http.get(), on_request, this);

event_base_dispatch(base.get());
});
CHECK(status.ok());
}
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/http/ev_http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class EvHttpServer {
// get real port
int get_real_port() const { return _real_port; }

std::vector<std::shared_ptr<event_base>> get_event_bases() {
std::lock_guard lock(_event_bases_lock);
return _event_bases;
}

private:
Status _bind();
HttpHandler* _find_handler(HttpRequest* req);
Expand Down
13 changes: 10 additions & 3 deletions be/src/http/http_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "http/http_channel.h"

#include <event2/buffer.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'event2/buffer.h' file not found [clang-diagnostic-error]

#include <event2/buffer.h>
         ^

#include <event2/bufferevent.h>
#include <event2/http.h>

#include <algorithm>
Expand Down Expand Up @@ -69,11 +70,17 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std:
evbuffer_free(evb);
}

void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size) {
void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size,
bufferevent_rate_limit_group* rate_limit_group) {
auto evb = evbuffer_new();
evbuffer_add_file(evb, fd, off, size);
evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK,
default_reason(HttpStatus::OK).c_str(), evb);
auto* evhttp_request = request->get_evhttp_request();
if (rate_limit_group) {
auto* evhttp_connection = evhttp_request_get_connection(evhttp_request);
auto* buffer_event = evhttp_connection_get_bufferevent(evhttp_connection);
bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
}
evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb);
evbuffer_free(evb);
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/http/http_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "http/http_status.h"

struct bufferevent_rate_limit_group;
namespace doris {

class HttpRequest;
Expand All @@ -43,7 +44,8 @@ class HttpChannel {

static void send_reply(HttpRequest* request, HttpStatus status, const std::string& content);

static void send_file(HttpRequest* request, int fd, size_t off, size_t size);
static void send_file(HttpRequest* request, int fd, size_t off, size_t size,
bufferevent_rate_limit_group* rate_limit_group = nullptr);

static bool compress_content(const std::string& accept_encoding, const std::string& input,
std::string* output);
Expand Down
5 changes: 3 additions & 2 deletions be/src/http/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ std::string get_content_type(const std::string& file_name) {
return "";
}

void do_file_response(const std::string& file_path, HttpRequest* req) {
void do_file_response(const std::string& file_path, HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group) {
if (file_path.find("..") != std::string::npos) {
LOG(WARNING) << "Not allowed to read relative path: " << file_path;
HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
Expand Down Expand Up @@ -165,7 +166,7 @@ void do_file_response(const std::string& file_path, HttpRequest* req) {
return;
}

HttpChannel::send_file(req, fd, 0, file_size);
HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
}

void do_dir_response(const std::string& dir_path, HttpRequest* req) {
Expand Down
5 changes: 4 additions & 1 deletion be/src/http/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "common/utils.h"
#include "http/http_request.h"

struct bufferevent_rate_limit_group;

namespace doris {

struct AuthInfo;
Expand All @@ -34,7 +36,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa

bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth);

void do_file_response(const std::string& dir_path, HttpRequest* req);
void do_file_response(const std::string& dir_path, HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group = nullptr);

void do_dir_response(const std::string& dir_path, HttpRequest* req);

Expand Down
Loading