From 4ebbd4c28d12c087dfbe8992f4bb3f95dc1d51de Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 6 Feb 2021 23:26:34 +0800 Subject: [PATCH] [Load Parallel][3/3] Support parallel delta writer In the previous broker load, multiple OlapTableSinks would send data to the same LoadChannel, and because of the lock granularity problem, LoadChannel could only process these requests serially, which made it impossible to make full use of cluster resources. This CL modifies the related locks so that LoadChannel can process these requests in parallel. In the test, with a size of 20G, the load speed of 334 million rows of data in 3 nodes has been increased from 9min to 5min, and after enabling 2 concurrency, it can be increased to 3min. Also modify the profile of load job. --- be/src/exec/tablet_sink.cpp | 37 ++++++++-- be/src/exec/tablet_sink.h | 26 +++++-- be/src/olap/delta_writer.cpp | 60 +++++++++++++-- be/src/olap/delta_writer.h | 11 ++- be/src/olap/olap_define.h | 2 + .../rowset/unique_rowset_id_generator.cpp | 1 + be/src/olap/tablet_manager.cpp | 2 + be/src/runtime/load_channel_mgr.cpp | 3 +- be/src/runtime/load_channel_mgr.h | 3 +- be/src/runtime/tablets_channel.cpp | 74 +++++++++---------- be/src/service/internal_service.cpp | 4 +- be/test/runtime/load_channel_mgr_test.cpp | 20 +++-- .../java/org/apache/doris/qe/Coordinator.java | 4 +- 13 files changed, 173 insertions(+), 74 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index a8f84b53362dd3..949e0990eb5493 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -562,12 +562,18 @@ Status OlapTableSink::prepare(RuntimeState* state) { _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT); _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT); _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); + _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime"); _convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime"); _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); _non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime"); - _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); + _non_blocking_send_work_timer = ADD_CHILD_TIMER(_profile, "NonBlockingSendWorkTime", "NonBlockingSendTime"); + _serialize_batch_timer = ADD_CHILD_TIMER(_profile, "SerializeBatchTime", "NonBlockingSendWorkTime"); + _total_add_batch_exec_timer = ADD_TIMER(_profile, "TotalAddBatchExecTime"); + _max_add_batch_exec_timer = ADD_TIMER(_profile, "MaxAddBatchExecTime"); + _add_batch_number = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT); + _num_node_channels = ADD_COUNTER(_profile, "NumberNodeChannels", TUnit::UNIT); _load_mem_limit = state->get_load_mem_limit(); // open all channels @@ -697,18 +703,23 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // BE id -> add_batch method counter std::unordered_map node_add_batch_counter_map; int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0, - actual_consume_ns = 0; + actual_consume_ns = 0, total_add_batch_exec_time_ns = 0, + max_add_batch_exec_time_ns = 0, + total_add_batch_num = 0, num_node_channels = 0; { SCOPED_TIMER(_close_timer); for (auto index_channel : _channels) { index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); }); + num_node_channels += index_channel->num_node_channels(); } for (auto index_channel : _channels) { + int64_t add_batch_exec_time = 0; index_channel->for_each_node_channel([&status, &state, &node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, - &queue_push_lock_ns, - &actual_consume_ns](NodeChannel* ch) { + &queue_push_lock_ns, &actual_consume_ns, + &total_add_batch_exec_time_ns, &add_batch_exec_time, + &total_add_batch_num](NodeChannel* ch) { auto s = ch->close_wait(state); if (!s.ok()) { // 'status' will store the last non-ok status of all channels @@ -719,8 +730,13 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, &queue_push_lock_ns, - &actual_consume_ns); + &actual_consume_ns, &total_add_batch_exec_time_ns, + &add_batch_exec_time, &total_add_batch_num); }); + + if (add_batch_exec_time > max_add_batch_exec_time_ns) { + max_add_batch_exec_time_ns = add_batch_exec_time; + } } } // TODO need to be improved @@ -732,9 +748,15 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_output_rows_counter, _number_output_rows); COUNTER_SET(_filtered_rows_counter, _number_filtered_rows); COUNTER_SET(_send_data_timer, _send_data_ns); + COUNTER_SET(_wait_mem_limit_timer, mem_exceeded_block_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); + COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns); + COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns); + COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns); + COUNTER_SET(_add_batch_number, total_add_batch_num); + COUNTER_SET(_num_node_channels, num_node_channels); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected(); @@ -744,11 +766,10 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; ss << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; + << ", txn_id=" << _txn_id << ", node add batch time(ms)/num: "; for (auto const& pair : node_add_batch_counter_map) { ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) - << ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) << ")(" - << pair.second.add_batch_num << ")} "; + << ")(" << pair.second.add_batch_num << ")} "; } LOG(INFO) << ss.str(); } else { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 8d894c6da711da..4a4220278e4529 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -80,7 +80,8 @@ struct AddBatchCounter { template class ReusableClosure : public google::protobuf::Closure { public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} + ReusableClosure() : cid(INVALID_BTHREAD_ID) { + } ~ReusableClosure() { // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. join(); @@ -173,12 +174,17 @@ class NodeChannel { void time_report(std::unordered_map* add_batch_counter_map, int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns, - int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) { + int64_t* queue_push_lock_ns, int64_t* actual_consume_ns, + int64_t* total_add_batch_exec_time_ns, int64_t* add_batch_exec_time_ns, + int64_t* total_add_batch_num) { (*add_batch_counter_map)[_node_id] += _add_batch_counter; *serialize_batch_ns += _serialize_batch_ns; *mem_exceeded_block_ns += _mem_exceeded_block_ns; *queue_push_lock_ns += _queue_push_lock_ns; *actual_consume_ns += _actual_consume_ns; + *add_batch_exec_time_ns = (_add_batch_counter.add_batch_execution_time_us * 1000); + *total_add_batch_exec_time_ns += *add_batch_exec_time_ns; + *total_add_batch_num += _add_batch_counter.add_batch_num; } int64_t node_id() const { return _node_id; } @@ -237,10 +243,10 @@ class NodeChannel { std::vector _tablet_commit_infos; AddBatchCounter _add_batch_counter; - std::atomic _serialize_batch_ns; - std::atomic _mem_exceeded_block_ns; - std::atomic _queue_push_lock_ns; - std::atomic _actual_consume_ns; + std::atomic _serialize_batch_ns{0}; + std::atomic _mem_exceeded_block_ns{0}; + std::atomic _queue_push_lock_ns{0}; + std::atomic _actual_consume_ns{0}; }; class IndexChannel { @@ -262,6 +268,8 @@ class IndexChannel { void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); } bool has_intolerable_failure(); + size_t num_node_channels() const { return _node_channels.size(); } + private: OlapTableSink* _parent; int64_t _index_id; @@ -382,12 +390,18 @@ class OlapTableSink : public DataSink { RuntimeProfile::Counter* _output_rows_counter = nullptr; RuntimeProfile::Counter* _filtered_rows_counter = nullptr; RuntimeProfile::Counter* _send_data_timer = nullptr; + RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; RuntimeProfile::Counter* _convert_batch_timer = nullptr; RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _non_blocking_send_timer = nullptr; + RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; + RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr; + RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr; + RuntimeProfile::Counter* _add_batch_number = nullptr; + RuntimeProfile::Counter* _num_node_channels = nullptr; // load mem limit is for remote load channel int64_t _load_mem_limit = -1; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1ed44fbf9f1b7b..ee80db1e2d5a00 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -176,10 +176,17 @@ OLAPStatus DeltaWriter::init() { } OLAPStatus DeltaWriter::write(Tuple* tuple) { - if (!_is_init) { + std::lock_guard l(_lock); + if (!_is_init && !_is_cancelled) { RETURN_NOT_OK(init()); } + if (_is_cancelled) { + // The writer may be cancelled at any time by other thread. + // just return ERROR if writer is cancelled. + return OLAP_ERR_ALREADY_CANCELLED; + } + _mem_table->insert(tuple); // if memtable is full, push it to the flush executor, @@ -196,7 +203,20 @@ OLAPStatus DeltaWriter::_flush_memtable_async() { return _flush_token->submit(_mem_table); } -OLAPStatus DeltaWriter::flush_memtable_and_wait() { +OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) { + std::lock_guard l(_lock); + if (!_is_init) { + // This writer is not initialized before flushing. Do nothing + // But we return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED, + // Because this method maybe called when trying to reduce mem consumption, + // and at that time, the writer may not be initialized yet and that is a normal case. + return OLAP_SUCCESS; + } + + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + if (mem_consumption() == _mem_table->memory_usage()) { // equal means there is no memtable in flush queue, just flush this memtable VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " @@ -208,7 +228,24 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() { DCHECK(mem_consumption() > _mem_table->memory_usage()); // this means there should be at least one memtable in flush queue. } - // wait all memtables in flush queue to be flushed. + + if (need_wait) { + // wait all memtables in flush queue to be flushed. + RETURN_NOT_OK(_flush_token->wait()); + } + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::wait_flush() { + std::lock_guard l(_lock); + if (!_is_init) { + // return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED for same reason + // as described in flush_memtable_and_wait() + return OLAP_SUCCESS; + } + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } RETURN_NOT_OK(_flush_token->wait()); return OLAP_SUCCESS; } @@ -220,7 +257,8 @@ void DeltaWriter::_reset_mem_table() { } OLAPStatus DeltaWriter::close() { - if (!_is_init) { + std::lock_guard l(_lock); + if (!_is_init && !_is_cancelled) { // if this delta writer is not initialized, but close() is called. // which means this tablet has no data loaded, but at least one tablet // in same partition has data loaded. @@ -229,14 +267,24 @@ OLAPStatus DeltaWriter::close() { RETURN_NOT_OK(init()); } + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + RETURN_NOT_OK(_flush_memtable_async()); _mem_table.reset(); return OLAP_SUCCESS; } OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec) { + std::lock_guard l(_lock); DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called"; + + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + // return error if previous flush failed RETURN_NOT_OK(_flush_token->wait()); DCHECK_EQ(_mem_tracker->consumption(), 0); @@ -295,7 +343,8 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField l(_lock); + if (!_is_init || _is_cancelled) { return OLAP_SUCCESS; } _mem_table.reset(); @@ -304,6 +353,7 @@ OLAPStatus DeltaWriter::cancel() { _flush_token->cancel(); } DCHECK_EQ(_mem_tracker->consumption(), 0); + _is_cancelled = true; return OLAP_SUCCESS; } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index caa0483801ac3c..9cf59eb50609c3 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -21,6 +21,7 @@ #include "gen_cpp/internal_service.pb.h" #include "olap/rowset/rowset_writer.h" #include "olap/tablet.h" +#include "util/spinlock.h" namespace doris { @@ -73,12 +74,17 @@ class DeltaWriter { // submit current memtable to flush queue, and wait all memtables in flush queue // to be flushed. // This is currently for reducing mem consumption of this delta writer. - OLAPStatus flush_memtable_and_wait(); + // If need_wait is true, it will wait for all memtable in flush queue to be flushed. + // Otherwise, it will just put memtables to the flush queue and return. + OLAPStatus flush_memtable_and_wait(bool need_wait); int64_t partition_id() const; int64_t mem_consumption() const; + // Wait all memtable in flush queue to be flushed + OLAPStatus wait_flush(); + private: DeltaWriter(WriteRequest* req, const std::shared_ptr& parent, StorageEngine* storage_engine); @@ -92,6 +98,7 @@ class DeltaWriter { private: bool _is_init = false; + bool _is_cancelled = false; WriteRequest _req; TabletSharedPtr _tablet; RowsetSharedPtr _cur_rowset; @@ -106,6 +113,8 @@ class DeltaWriter { StorageEngine* _storage_engine; std::unique_ptr _flush_token; std::shared_ptr _mem_tracker; + + SpinLock _lock; }; } // namespace doris diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 75ad835afa8d17..ef1ec2efce8780 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -165,6 +165,8 @@ enum OLAPStatus { OLAP_ERR_TOO_MANY_TRANSACTIONS = -233, OLAP_ERR_INVALID_SNAPSHOT_VERSION = -234, OLAP_ERR_TOO_MANY_VERSION = -235, + OLAP_ERR_NOT_INITIALIZED = -236, + OLAP_ERR_ALREADY_CANCELLED = -237, // CommandExecutor // [-300, -400) diff --git a/be/src/olap/rowset/unique_rowset_id_generator.cpp b/be/src/olap/rowset/unique_rowset_id_generator.cpp index 71352ca848c58e..c21b8caf2f54da 100644 --- a/be/src/olap/rowset/unique_rowset_id_generator.cpp +++ b/be/src/olap/rowset/unique_rowset_id_generator.cpp @@ -19,6 +19,7 @@ #include "util/doris_metrics.h" #include "util/spinlock.h" +#include "util/stack_util.h" #include "util/uid_util.h" namespace doris { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 5a2fb5a9093aa3..18ef1132769cab 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -632,6 +632,7 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, TTabletId* tablet_id, TSchemaHash* schema_hash) { + // the path like: /data/14/10080/964828783/ static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)"); // match tablet schema hash data path, for example, the path is /data/1/16791/29998 // 1 is shard id , 16791 is tablet id, 29998 is schema hash @@ -651,6 +652,7 @@ bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, } bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) { + // the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*"); string id_str; bool ret = RE2::PartialMatch(path, re, &id_str); diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index c2feebb2f38951..2f320d0c3175f4 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -117,8 +117,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { static void dummy_deleter(const CacheKey& key, void* value) {} Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request, - google::protobuf::RepeatedPtrField* tablet_vec, - int64_t* wait_lock_time_ns) { + google::protobuf::RepeatedPtrField* tablet_vec) { UniqueId load_id(request.id()); // 1. get load channel std::shared_ptr channel; diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index f0ec6fab0738e0..450c8bf0e2d451 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -51,8 +51,7 @@ class LoadChannelMgr { Status open(const PTabletWriterOpenRequest& request); Status add_batch(const PTabletWriterAddBatchRequest& request, - google::protobuf::RepeatedPtrField* tablet_vec, - int64_t* wait_lock_time_ns); + google::protobuf::RepeatedPtrField* tablet_vec); // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 76c7166d827304..b2a32c0a925eb9 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -77,23 +77,26 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { DCHECK(params.tablet_ids_size() == params.row_batch().num_rows()); - std::lock_guard l(_lock); - if (_state != kOpened) { - return _state == kFinished - ? _close_status - : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", - _key.to_string(), _state)); - } - auto next_seq = _next_seqs[params.sender_id()]; - // check packet - if (params.packet_seq() < next_seq) { - LOG(INFO) << "packet has already recept before, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::OK(); - } else if (params.packet_seq() > next_seq) { - LOG(WARNING) << "lost data packet, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::InternalError("lost data packet"); + int64_t cur_seq; + { + std::lock_guard l(_lock); + if (_state != kOpened) { + return _state == kFinished + ? _close_status + : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", + _key.to_string(), _state)); + } + cur_seq = _next_seqs[params.sender_id()]; + // check packet + if (params.packet_seq() < cur_seq) { + LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq + << ", recept_seq=" << params.packet_seq(); + return Status::OK(); + } else if (params.packet_seq() > cur_seq) { + LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq + << ", recept_seq=" << params.packet_seq(); + return Status::InternalError("lost data packet"); + } } RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get()); @@ -115,7 +118,11 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { return Status::InternalError(err_msg); } } - _next_seqs[params.sender_id()]++; + + { + std::lock_guard l(_lock); + _next_seqs[params.sender_id()] = cur_seq + 1; + } return Status::OK(); } @@ -183,29 +190,20 @@ Status TabletsChannel::reduce_mem_usage() { // therefore it's possible for reduce_mem_usage() to be called right after close() return _close_status; } - // find tablet writer with largest mem consumption - int64_t max_consume = 0L; - DeltaWriter* writer = nullptr; - for (auto& it : _tablet_writers) { - if (it.second->mem_consumption() > max_consume) { - max_consume = it.second->mem_consumption(); - writer = it.second; - } - } - if (writer == nullptr || max_consume == 0) { - // barely not happend, just return OK - return Status::OK(); + // Flush all memtables + for (auto& it : _tablet_writers) { + it.second->flush_memtable_and_wait(false); } - VLOG_NOTICE << "pick the delte writer to flush, with mem consumption: " << max_consume - << ", channel key: " << _key; - OLAPStatus st = writer->flush_memtable_and_wait(); - if (st != OLAP_SUCCESS) { - // flush failed, return error - std::stringstream ss; - ss << "failed to reduce mem consumption by flushing memtable. err: " << st; - return Status::InternalError(ss.str()); + for (auto& it : _tablet_writers) { + OLAPStatus st = it.second->wait_flush(); + if (st != OLAP_SUCCESS) { + // flush failed, return error + std::stringstream ss; + ss << "failed to reduce mem consumption by flushing memtable. err: " << st; + return Status::InternalError(ss.str()); + } } return Status::OK(); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 7c85654600c774..51480fbb038116 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -97,11 +97,10 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr _tablet_worker_pool.offer([request, response, done, this]() { brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; - int64_t wait_lock_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); auto st = _exec_env->load_channel_mgr()->add_batch( - *request, response->mutable_tablet_vec(), &wait_lock_time_ns); + *request, response->mutable_tablet_vec()); if (!st.ok()) { LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg() << ", id=" << request->id() << ", index_id=" << request->index_id() @@ -110,7 +109,6 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr st.to_protobuf(response->mutable_status()); } response->set_execution_time_us(execution_time_ns / 1000); - response->set_wait_lock_time_us(wait_lock_time_ns / 1000); }); } diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp index 54b0d3fedae5ce..80e03adb5820a4 100644 --- a/be/test/runtime/load_channel_mgr_test.cpp +++ b/be/test/runtime/load_channel_mgr_test.cpp @@ -85,7 +85,11 @@ OLAPStatus DeltaWriter::cancel() { return OLAP_SUCCESS; } -OLAPStatus DeltaWriter::flush_memtable_and_wait() { +OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) { + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::wait_flush() { return OLAP_SUCCESS; } @@ -246,7 +250,7 @@ TEST_F(LoadChannelMgrTest, normal) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_TRUE(st.ok()); } @@ -413,7 +417,7 @@ TEST_F(LoadChannelMgrTest, add_failed) { row_batch.serialize(request.mutable_row_batch()); add_status = OLAP_ERR_TABLE_NOT_FOUND; google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_FALSE(st.ok()); } @@ -503,7 +507,7 @@ TEST_F(LoadChannelMgrTest, close_failed) { row_batch.serialize(request.mutable_row_batch()); close_status = OLAP_ERR_TABLE_NOT_FOUND; google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); // even if delta close failed, the return status is still ok, but tablet_vec is empty ASSERT_TRUE(st.ok()); @@ -591,7 +595,7 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_FALSE(st.ok()); } @@ -677,10 +681,10 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec1; - auto st = mgr.add_batch(request, &tablet_vec1, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec1); ASSERT_TRUE(st.ok()); google::protobuf::RepeatedPtrField tablet_vec2; - st = mgr.add_batch(request, &tablet_vec2, &wait_lock_time_ns); + st = mgr.add_batch(request, &tablet_vec2); request.release_id(); ASSERT_TRUE(st.ok()); } @@ -693,7 +697,7 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) { request.set_eos(true); request.set_packet_seq(0); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_TRUE(st.ok()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c0fba828b7413a..64295d594923dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1069,6 +1069,8 @@ private void computeFragmentHosts() throws Exception { List> perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum); + LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size()); + for (List scanRangeParams : perInstanceScanRanges) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); @@ -1085,7 +1087,7 @@ private void computeFragmentHosts() throws Exception { throw new UserException("there is no scanNode Backend"); } this.addressToBackendID.put(execHostport, backendIdRef.getRef()); - FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, 0, params); params.instanceExecParams.add(instanceParam); }