From 6c62ac25d9625010c6bffaf3f945ff20b415a4f6 Mon Sep 17 00:00:00 2001 From: Yi WU Date: Mon, 18 Apr 2022 12:22:41 +0800 Subject: [PATCH 1/5] adjust read unit of http to optimize stream load --- be/src/http/action/stream_load.cpp | 4 +- be/src/runtime/fragment_mgr.cpp | 2 +- thirdparty/patches/libevent.patch | 126 ++++++++++++++++++++--------- 3 files changed, 90 insertions(+), 42 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index d0a1693440a750..accb6330002127 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -348,7 +348,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(4096); + auto bb = ByteBuffer::allocate(128 * 1024); auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); bb->pos = remove_bytes; bb->flip(); @@ -392,7 +392,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_header_type(ctx->header_type); request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { - auto pipe = std::make_shared(1024 * 1024 /* max_buffered_bytes */, + auto pipe = std::make_shared(2 * 1024 * 1024 /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, ctx->body_bytes /* total_length */); RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe)); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d12bc85f2623eb..85888a464cf988 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -511,7 +511,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { stream_load_cxt->need_commit_self = true; stream_load_cxt->need_rollback = true; // total_length == -1 means read one message from pipe in once time, don't care the length. - auto pipe = std::make_shared(1024 * 1024 /* max_buffered_bytes */, + auto pipe = std::make_shared(2 * 1024 * 1024 /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, -1 /* total_length */, true /* use_proto */); stream_load_cxt->body_sink = pipe; diff --git a/thirdparty/patches/libevent.patch b/thirdparty/patches/libevent.patch index 83e426d62e95df..a545897cf15149 100644 --- a/thirdparty/patches/libevent.patch +++ b/thirdparty/patches/libevent.patch @@ -1,6 +1,75 @@ -diff -uprN a/http.c b/http.c ---- a/http.c 2020-07-05 20:02:46.000000000 +0800 -+++ b/http.c 2021-09-28 13:56:14.045159153 +0800 +diff --git a/CMakeLists.txt b/CMakeLists.txt +index 676727f1..833fbf70 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -200,7 +200,7 @@ endif() + if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG})) + set(GNUC 1) + endif() +-if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG})) ++if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC")) + set(MSVC 1) + endif() + +diff --git a/buffer.c b/buffer.c +index 3524b350..e5d97458 100644 +--- a/buffer.c ++++ b/buffer.c +@@ -2204,9 +2204,9 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen) + #define IOV_LEN_TYPE unsigned long + #endif + #endif +-#define NUM_READ_IOVEC 4 ++#define NUM_READ_IOVEC 8 + +-#define EVBUFFER_MAX_READ 4096 ++#define EVBUFFER_MAX_READ (128 * 1024) + + /** Helper function to figure out which space to use for reading data into + an evbuffer. Internal use only. +diff --git a/bufferevent_async.c b/bufferevent_async.c +index 40c7c5e8..c1624878 100644 +--- a/bufferevent_async.c ++++ b/bufferevent_async.c +@@ -275,7 +275,7 @@ bev_async_consider_reading(struct bufferevent_async *beva) + } + at_most = read_high - cur_size; + } else { +- at_most = 16384; /* FIXME totally magic. */ ++ at_most = 128 * 1024; /* FIXME totally magic. */ + } + + /* XXXX This over-commits. */ +diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c +index 25874968..9bc2b577 100644 +--- a/bufferevent_ratelim.c ++++ b/bufferevent_ratelim.c +@@ -179,7 +179,7 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) + } + + /* Default values for max_single_read & max_single_write variables. */ +-#define MAX_SINGLE_READ_DEFAULT 16384 ++#define MAX_SINGLE_READ_DEFAULT (128 * 1024) + #define MAX_SINGLE_WRITE_DEFAULT 16384 + + #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) +diff --git a/http-internal.h b/http-internal.h +index feaf436d..9f9b5ab5 100644 +--- a/http-internal.h ++++ b/http-internal.h +@@ -167,6 +167,8 @@ struct evhttp { + void *gencbarg; + struct bufferevent* (*bevcb)(struct event_base *, void *); + void *bevcbarg; ++ int (*newreqcb)(struct evhttp_request *req, void *); ++ void *newreqcbarg; + + struct event_base *base; + }; +diff --git a/http.c b/http.c +index 04f089bc..53951cba 100644 +--- a/http.c ++++ b/http.c @@ -3975,6 +3975,14 @@ evhttp_set_bevcb(struct evhttp *http, http->bevcbarg = cbarg; } @@ -16,7 +85,7 @@ diff -uprN a/http.c b/http.c /* * Request related functions */ -@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_reques +@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_request *req) req->flags |= EVHTTP_REQ_NEEDS_FREE; return; } @@ -25,7 +94,7 @@ diff -uprN a/http.c b/http.c if (req->remote_host != NULL) mm_free(req->remote_host); -@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct +@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct evhttp_request *req, req->on_complete_cb_arg = cb_arg; } @@ -41,7 +110,7 @@ diff -uprN a/http.c b/http.c /* * Allows for inspection of the request URI */ -@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connec +@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connection(struct evhttp_connection *evcon) */ req->userdone = 1; @@ -59,25 +128,15 @@ diff -uprN a/http.c b/http.c evhttp_start_read_(evcon); -diff -uprN a/http-internal.h b/http-internal.h ---- a/http-internal.h 2020-07-05 20:02:46.000000000 +0800 -+++ b/http-internal.h 2021-09-28 13:56:13.925151028 +0800 -@@ -167,6 +167,8 @@ struct evhttp { - void *gencbarg; - struct bufferevent* (*bevcb)(struct event_base *, void *); - void *bevcbarg; -+ int (*newreqcb)(struct evhttp_request *req, void *); -+ void *newreqcbarg; - - struct event_base *base; - }; -diff -uprN a/include/event2/http.h b/include/event2/http.h ---- a/include/event2/http.h 2020-07-05 20:02:46.000000000 +0800 -+++ b/include/event2/http.h 2021-09-28 13:56:13.928151231 +0800 -@@ -299,6 +299,20 @@ void evhttp_set_bevcb(struct evhttp *htt +diff --git a/include/event2/http.h b/include/event2/http.h +index 2a41303e..e80bab9a 100644 +--- a/include/event2/http.h ++++ b/include/event2/http.h +@@ -298,6 +298,20 @@ EVENT2_EXPORT_SYMBOL + void evhttp_set_bevcb(struct evhttp *http, struct bufferevent *(*cb)(struct event_base *, void *), void *arg); - /** ++/** + Set a callback which allows the user to note or throttle incoming requests. + The requests are not populated with HTTP level information. They + are just associated to a connection. @@ -91,10 +150,9 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h +void evhttp_set_newreqcb(struct evhttp *http, + int (*cb)(struct evhttp_request*, void *), void *arg); + -+/** + /** Adds a virtual host to the http server. - A virtual host is a newly initialized evhttp object that has request @@ -624,6 +638,20 @@ EVENT2_EXPORT_SYMBOL void evhttp_request_set_on_complete_cb(struct evhttp_request *req, void (*cb)(struct evhttp_request *, void *), void *cb_arg); @@ -116,9 +174,10 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h /** Frees the request object and removes associated events. */ EVENT2_EXPORT_SYMBOL void evhttp_request_free(struct evhttp_request *req); -diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h ---- a/include/event2/http_struct.h 2020-07-05 20:02:46.000000000 +0800 -+++ b/include/event2/http_struct.h 2021-09-28 13:56:13.928151231 +0800 +diff --git a/include/event2/http_struct.h b/include/event2/http_struct.h +index 4bf5b1ff..0762cabd 100644 +--- a/include/event2/http_struct.h ++++ b/include/event2/http_struct.h @@ -142,6 +142,12 @@ struct { */ void (*on_complete_cb)(struct evhttp_request *, void *); @@ -132,14 +191,3 @@ diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h }; #ifdef __cplusplus -diff -uprN a/CMakeLists.txt b/CMakeLists.txt ---- a/CMakeLists.txt 2020-07-05 20:02:46.000000000 +0800 -+++ b/CMakeLists.txt 2022-01-10 13:29:32.912883436 +0800 -@@ -200,6 +200,6 @@ endif() - if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG})) - set(GNUC 1) - endif() --if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG})) -+if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC")) - set(MSVC 1) - endif() From 048245d790bce19c23ebcb16b654288d49024fee Mon Sep 17 00:00:00 2001 From: Yi WU Date: Wed, 4 May 2022 09:52:13 +0800 Subject: [PATCH 2/5] define kMaxPipeBufferedBytes to avoid using numbers in different places. --- be/src/http/action/stream_load.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/stream_load/stream_load_pipe.h | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index accb6330002127..b9e1ed87b69565 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -392,7 +392,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_header_type(ctx->header_type); request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { - auto pipe = std::make_shared(2 * 1024 * 1024 /* max_buffered_bytes */, + auto pipe = std::make_shared(kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, ctx->body_bytes /* total_length */); RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe)); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 85888a464cf988..c553744b5d52b3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -511,7 +511,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { stream_load_cxt->need_commit_self = true; stream_load_cxt->need_rollback = true; // total_length == -1 means read one message from pipe in once time, don't care the length. - auto pipe = std::make_shared(2 * 1024 * 1024 /* max_buffered_bytes */, + auto pipe = std::make_shared(kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, -1 /* total_length */, true /* use_proto */); stream_load_cxt->body_sink = pipe; diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 7872cab1e279b1..e05eb7454a5d79 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -29,11 +29,13 @@ namespace doris { +const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; // StreamLoadPipe use to transfer data from producer to consumer // Data in pip is stored in chunks. class StreamLoadPipe : public MessageBodySink, public FileReader { public: - StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024, + StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, + size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, bool use_proto = false) : _buffered_bytes(0), _proto_buffered_bytes(0), From dd9c5fc5b9ee2b9b602cfb0b664845c8354ef986 Mon Sep 17 00:00:00 2001 From: Yi WU Date: Thu, 12 May 2022 22:32:44 +0800 Subject: [PATCH 3/5] clang format --- be/src/http/http_request.h | 4 +--- be/src/runtime/stream_load/stream_load_pipe.h | 4 ++-- be/test/util/threadpool_test.cpp | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 81085f7cceb049..1503e4303a1b69 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -72,9 +72,7 @@ class HttpRequest { void set_handler(HttpHandler* handler) { _handler = handler; } HttpHandler* handler() const { return _handler; } - struct evhttp_request* get_evhttp_request() const { - return _ev_req; - } + struct evhttp_request* get_evhttp_request() const { return _ev_req; } void* handler_ctx() const { return _handler_ctx; } void set_handler_ctx(void* ctx) { diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index e05eb7454a5d79..d793d1cbd62a38 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -35,8 +35,8 @@ const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; class StreamLoadPipe : public MessageBodySink, public FileReader { public: StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, - size_t min_chunk_size = 64 * 1024, - int64_t total_length = -1, bool use_proto = false) + size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, + bool use_proto = false) : _buffered_bytes(0), _proto_buffered_bytes(0), _max_buffered_bytes(max_buffered_bytes), diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index 33c9fe3817d769..eceda73f552b51 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -236,7 +236,7 @@ TEST_F(ThreadPoolTest, TestRace) { // so an cast is needed to use std::bind EXPECT_TRUE(_pool ->submit_func(std::bind( - (void (CountDownLatch::*)())(&CountDownLatch::count_down), &l)) + (void(CountDownLatch::*)())(&CountDownLatch::count_down), &l)) .ok()); l.wait(); // Sleeping a different amount in each iteration makes it more likely to hit From 1c72ac147197fce7fb50539b12f1e6ec4b429971 Mon Sep 17 00:00:00 2001 From: Yi WU Date: Thu, 19 May 2022 20:47:56 +0800 Subject: [PATCH 4/5] clang format --- be/src/http/http_request.h | 4 +++- be/test/util/threadpool_test.cpp | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 1503e4303a1b69..81085f7cceb049 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -72,7 +72,9 @@ class HttpRequest { void set_handler(HttpHandler* handler) { _handler = handler; } HttpHandler* handler() const { return _handler; } - struct evhttp_request* get_evhttp_request() const { return _ev_req; } + struct evhttp_request* get_evhttp_request() const { + return _ev_req; + } void* handler_ctx() const { return _handler_ctx; } void set_handler_ctx(void* ctx) { diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index eceda73f552b51..33c9fe3817d769 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -236,7 +236,7 @@ TEST_F(ThreadPoolTest, TestRace) { // so an cast is needed to use std::bind EXPECT_TRUE(_pool ->submit_func(std::bind( - (void(CountDownLatch::*)())(&CountDownLatch::count_down), &l)) + (void (CountDownLatch::*)())(&CountDownLatch::count_down), &l)) .ok()); l.wait(); // Sleeping a different amount in each iteration makes it more likely to hit From 2f6d9b1714f1a0e81350556fdb38b811e072e6f1 Mon Sep 17 00:00:00 2001 From: Yi WU Date: Thu, 19 May 2022 21:20:31 +0800 Subject: [PATCH 5/5] clang format --- be/test/util/threadpool_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index 33c9fe3817d769..eceda73f552b51 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -236,7 +236,7 @@ TEST_F(ThreadPoolTest, TestRace) { // so an cast is needed to use std::bind EXPECT_TRUE(_pool ->submit_func(std::bind( - (void (CountDownLatch::*)())(&CountDownLatch::count_down), &l)) + (void(CountDownLatch::*)())(&CountDownLatch::count_down), &l)) .ok()); l.wait(); // Sleeping a different amount in each iteration makes it more likely to hit