Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,17 @@ CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
// else we will call sync method
CONF_mBool(runtime_filter_use_async_rpc, "true");

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
CONF_mInt32(max_send_batch_parallelism_per_job, "5");
CONF_Validator(max_send_batch_parallelism_per_job, [](const int config) -> bool { return config >= 1; });

// number of send batch thread pool size
CONF_Int32(send_batch_thread_pool_thread_num, "64");
// number of send batch thread pool queue size
CONF_Int32(send_batch_thread_pool_queue_size, "102400");

} // namespace config

} // namespace doris
Expand Down
123 changes: 68 additions & 55 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "util/debug/sanitizer_scopes.h"
#include "util/monotime.h"
#include "util/time.h"
#include "util/threadpool.h"
#include "util/uid_util.h"

namespace doris {
Expand Down Expand Up @@ -194,11 +195,10 @@ Status NodeChannel::open_wait() {

if (result.has_execution_time_us()) {
_add_batch_counter.add_batch_execution_time_us += result.execution_time_us();
_add_batch_counter.add_batch_wait_lock_time_us += result.wait_lock_time_us();
_add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us();
_add_batch_counter.add_batch_num++;
}
});

return status;
}

Expand Down Expand Up @@ -342,68 +342,75 @@ void NodeChannel::cancel() {
request.release_id();
}

int NodeChannel::try_send_and_fetch_status() {
int NodeChannel::try_send_and_fetch_status(std::unique_ptr<ThreadPoolToken>& thread_pool_token) {
auto st = none_of({_cancelled, _send_finished});
if (!st.ok()) {
return 0;
}

if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) {
SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
AddBatchReq send_batch;
{
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
std::lock_guard<std::mutex> l(_pending_batches_lock);
DCHECK(!_pending_batches.empty());
send_batch = std::move(_pending_batches.front());
_pending_batches.pop();
_pending_batches_num--;
bool is_finished = true;
if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 &&
_last_patch_processed_finished.compare_exchange_strong(is_finished, false)) {
auto s = thread_pool_token->submit_func(std::bind(&NodeChannel::try_send_batch, this));
if (!s.ok()) {
_cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
}
}
return _send_finished ? 0 : 1;
}

auto row_batch = std::move(send_batch.first);
auto request = std::move(send_batch.second); // doesn't need to be saved in heap

// tablet_ids has already set when add row
request.set_packet_seq(_next_packet_seq);
if (row_batch->num_rows() > 0) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
row_batch->serialize(request.mutable_row_batch());
}
void NodeChannel::try_send_batch() {
SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
AddBatchReq send_batch;
{
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
std::lock_guard<std::mutex> l(_pending_batches_lock);
DCHECK(!_pending_batches.empty());
send_batch = std::move(_pending_batches.front());
_pending_batches.pop();
_pending_batches_num--;
}

_add_batch_closure->reset();
int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
cancel();
return 0;
} else {
remain_ms = _min_rpc_timeout_ms;
}
}
_add_batch_closure->cntl.set_timeout_ms(remain_ms);
if (config::tablet_writer_ignore_eovercrowded) {
_add_batch_closure->cntl.ignore_eovercrowded();
}
auto row_batch = std::move(send_batch.first);
auto request = std::move(send_batch.second); // doesn't need to be saved in heap

if (request.eos()) {
for (auto pid : _parent->_partition_ids) {
request.add_partition_ids(pid);
}
// tablet_ids has already set when add row
request.set_packet_seq(_next_packet_seq);
if (row_batch->num_rows() > 0) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
row_batch->serialize(request.mutable_row_batch());
}

// eos request must be the last request
_add_batch_closure->end_mark();
_send_finished = true;
DCHECK(_pending_batches_num == 0);
_add_batch_closure->reset();
int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
cancel();
} else {
remain_ms = _min_rpc_timeout_ms;
}
}
_add_batch_closure->cntl.set_timeout_ms(remain_ms);
if (config::tablet_writer_ignore_eovercrowded) {
_add_batch_closure->cntl.ignore_eovercrowded();
}

_add_batch_closure->set_in_flight();
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
&_add_batch_closure->result, _add_batch_closure);
if (request.eos()) {
for (auto pid : _parent->_partition_ids) {
request.add_partition_ids(pid);
}

_next_packet_seq++;
// eos request must be the last request
_add_batch_closure->end_mark();
_send_finished = true;
DCHECK(_pending_batches_num == 0);
}

return _send_finished ? 0 : 1;
_add_batch_closure->set_in_flight();
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
&_add_batch_closure->result, _add_batch_closure);

_next_packet_seq++;
_last_patch_processed_finished = true;
}

Status NodeChannel::none_of(std::initializer_list<bool> vars) {
Expand Down Expand Up @@ -528,6 +535,9 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
} else {
_load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec;
}
if (table_sink.__isset.send_batch_parallelism && table_sink.send_batch_parallelism > 1) {
_send_batch_parallelism = table_sink.send_batch_parallelism;
}

return Status::OK();
}
Expand Down Expand Up @@ -675,7 +685,9 @@ Status OlapTableSink::open(RuntimeState* state) {
return Status::InternalError(ss.str());
}
}

int32_t send_batch_parallelism = MIN(_send_batch_parallelism, config::max_send_batch_parallelism_per_job);
_send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
RETURN_IF_ERROR(Thread::create(
"OlapTableSink", "send_batch_process", [this]() { this->_send_batch_process(); },
&_sender_thread));
Expand Down Expand Up @@ -815,10 +827,11 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
// print log of add batch time of all node, for tracing load performance easily
std::stringstream ss;
ss << "finished to close olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", node add batch time(ms)/num: ";
<< ", txn_id=" << _txn_id << ", node add batch time(ms)/wait execution time(ms)/num: ";
for (auto const& pair : node_add_batch_counter_map) {
ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000)
<< ")(" << pair.second.add_batch_num << ")} ";
<< ")(" << (pair.second.add_batch_wait_execution_time_us / 1000) << ")("
<< pair.second.add_batch_num << ")} ";
}
LOG(INFO) << ss.str();
} else {
Expand Down Expand Up @@ -1006,8 +1019,8 @@ void OlapTableSink::_send_batch_process() {
do {
int running_channels_num = 0;
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([&running_channels_num](NodeChannel* ch) {
running_channels_num += ch->try_send_and_fetch_status();
index_channel->for_each_node_channel([&running_channels_num, this](NodeChannel* ch) {
running_channels_num += ch->try_send_and_fetch_status(this->_send_batch_thread_pool_token);
});
}

Expand Down
16 changes: 12 additions & 4 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Bitmap;
class MemTracker;
class RuntimeProfile;
class RowDescriptor;
class ThreadPool;
class ThreadPoolToken;
class Tuple;
class TupleDescriptor;
class ExprContext;
Expand All @@ -58,12 +60,12 @@ struct AddBatchCounter {
// total execution time of a add_batch rpc
int64_t add_batch_execution_time_us = 0;
// lock waiting time in a add_batch rpc
int64_t add_batch_wait_lock_time_us = 0;
int64_t add_batch_wait_execution_time_us = 0;
// number of add_batch call
int64_t add_batch_num = 0;
AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
add_batch_execution_time_us += rhs.add_batch_execution_time_us;
add_batch_wait_lock_time_us += rhs.add_batch_wait_lock_time_us;
add_batch_wait_execution_time_us += rhs.add_batch_wait_execution_time_us;
add_batch_num += rhs.add_batch_num;
return *this;
}
Expand Down Expand Up @@ -169,7 +171,9 @@ class NodeChannel {
// 1: running, haven't reach eos.
// only allow 1 rpc in flight
// plz make sure, this func should be called after open_wait().
int try_send_and_fetch_status();
int try_send_and_fetch_status(std::unique_ptr<ThreadPoolToken>& thread_pool_token);

void try_send_batch();

void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,
int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns,
Expand Down Expand Up @@ -227,6 +231,8 @@ class NodeChannel {
// add batches finished means the last rpc has be response, used to check whether this channel can be closed
std::atomic<bool> _add_batches_finished{false};

std::atomic<bool> _last_patch_processed_finished{true};

bool _eos_is_produced{false}; // only for restricting producer behaviors

std::unique_ptr<RowDescriptor> _row_desc;
Expand Down Expand Up @@ -374,6 +380,7 @@ class OlapTableSink : public DataSink {

CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _sender_thread;
std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;

std::vector<DecimalV2Value> _max_decimalv2_val;
std::vector<DecimalV2Value> _min_decimalv2_val;
Expand Down Expand Up @@ -410,7 +417,8 @@ class OlapTableSink : public DataSink {
// the timeout of load channels opened by this tablet sink. in second
int64_t _load_channel_timeout_s = 0;

// True if this sink has been closed once
int32_t _send_batch_parallelism = 1;
// True if this sink has been closed once bool
bool _is_closed = false;
// Save the status of close() method
Status _close_status;
Expand Down
14 changes: 12 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,10 @@ int StreamLoadAction::on_header(HttpRequest* req) {
HttpChannel::send_reply(req, str);
streaming_load_current_processing->increment(-1);
#ifndef BE_TEST
str = ctx->prepare_stream_load_record(str);
_sava_stream_load_record(ctx, str);
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_sava_stream_load_record(ctx, str);
}
#endif
return -1;
}
Expand Down Expand Up @@ -485,6 +487,14 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL));
}

if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) {
try {
request.__set_send_batch_parallelism(std::stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM)));
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid send_batch_parallelism format");
}
}

if (ctx->timeout_second != -1) {
request.__set_timeout(ctx->timeout_second);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ static const std::string HTTP_FUNCTION_COLUMN = "function_column";
static const std::string HTTP_SEQUENCE_COL = "sequence_col";
static const std::string HTTP_COMPRESS_TYPE = "compress_type";

static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism";

static const std::string HTTP_100_CONTINUE = "100-continue";

} // namespace doris
11 changes: 7 additions & 4 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

#include "olap/memtable.h"
#include "util/scoped_cleanup.h"
#include "util/time.h"

namespace doris {

std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
<< ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
<< ", flush count=" << stat.flush_count
<< ", flush bytes: " << stat.flush_size_bytes
<< ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
Expand All @@ -39,7 +41,8 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
// its reference count is not 0.
OLAPStatus FlushToken::submit(const std::shared_ptr<MemTable>& memtable) {
RETURN_NOT_OK(_flush_status.load());
_flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable));
int64_t submit_task_time = MonotonicNanos();
_flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable, submit_task_time));
return OLAP_SUCCESS;
}

Expand All @@ -52,9 +55,9 @@ OLAPStatus FlushToken::wait() {
return _flush_status.load();
}

void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) {
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable, int64_t submit_task_time) {
_stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time);
SCOPED_CLEANUP({ memtable.reset(); });

// If previous flush has failed, return directly
if (_flush_status.load() != OLAP_SUCCESS) {
return;
Expand Down
11 changes: 6 additions & 5 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class MemTable;
// the statistic of a certain flush handler.
// use atomic because it may be updated by multi threads
struct FlushStatistic {
int64_t flush_time_ns = 0;
int64_t flush_count = 0;
int64_t flush_size_bytes = 0;
int64_t flush_disk_size_bytes = 0;
std::atomic_uint64_t flush_time_ns = 0;
std::atomic_uint64_t flush_count = 0;
std::atomic_uint64_t flush_size_bytes = 0;
std::atomic_uint64_t flush_disk_size_bytes = 0;
std::atomic_uint64_t flush_wait_time_ns = 0;
};

std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
Expand Down Expand Up @@ -68,7 +69,7 @@ class FlushToken {
const FlushStatistic& get_stats() const { return _stats; }

private:
void _flush_memtable(std::shared_ptr<MemTable> mem_table);
void _flush_memtable(std::shared_ptr<MemTable> mem_table, int64_t submit_task_time);

std::unique_ptr<ThreadPoolToken> _flush_token;

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class ExecEnv {
PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
ThreadPool* limited_scan_thread_pool() { return _limited_scan_thread_pool.get(); }
PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); }
CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
Expand Down Expand Up @@ -189,6 +190,7 @@ class ExecEnv {
PriorityThreadPool* _scan_thread_pool = nullptr;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;

std::unique_ptr<ThreadPool> _send_batch_thread_pool;
PriorityThreadPool* _etl_thread_pool = nullptr;
CgroupsMgr* _cgroups_mgr = nullptr;
FragmentMgr* _fragment_mgr = nullptr;
Expand Down
Loading