diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index a8f84b53362dd3..949e0990eb5493 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -562,12 +562,18 @@ Status OlapTableSink::prepare(RuntimeState* state) { _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT); _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT); _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); + _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime"); _convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime"); _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); _non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime"); - _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); + _non_blocking_send_work_timer = ADD_CHILD_TIMER(_profile, "NonBlockingSendWorkTime", "NonBlockingSendTime"); + _serialize_batch_timer = ADD_CHILD_TIMER(_profile, "SerializeBatchTime", "NonBlockingSendWorkTime"); + _total_add_batch_exec_timer = ADD_TIMER(_profile, "TotalAddBatchExecTime"); + _max_add_batch_exec_timer = ADD_TIMER(_profile, "MaxAddBatchExecTime"); + _add_batch_number = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT); + _num_node_channels = ADD_COUNTER(_profile, "NumberNodeChannels", TUnit::UNIT); _load_mem_limit = state->get_load_mem_limit(); // open all channels @@ -697,18 +703,23 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // BE id -> add_batch method counter std::unordered_map node_add_batch_counter_map; int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0, - actual_consume_ns = 0; + actual_consume_ns = 0, total_add_batch_exec_time_ns = 0, + max_add_batch_exec_time_ns = 0, + total_add_batch_num = 0, num_node_channels = 0; { SCOPED_TIMER(_close_timer); for (auto index_channel : _channels) { index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); }); + num_node_channels += index_channel->num_node_channels(); } for (auto index_channel : _channels) { + int64_t add_batch_exec_time = 0; index_channel->for_each_node_channel([&status, &state, &node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, - &queue_push_lock_ns, - &actual_consume_ns](NodeChannel* ch) { + &queue_push_lock_ns, &actual_consume_ns, + &total_add_batch_exec_time_ns, &add_batch_exec_time, + &total_add_batch_num](NodeChannel* ch) { auto s = ch->close_wait(state); if (!s.ok()) { // 'status' will store the last non-ok status of all channels @@ -719,8 +730,13 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, &queue_push_lock_ns, - &actual_consume_ns); + &actual_consume_ns, &total_add_batch_exec_time_ns, + &add_batch_exec_time, &total_add_batch_num); }); + + if (add_batch_exec_time > max_add_batch_exec_time_ns) { + max_add_batch_exec_time_ns = add_batch_exec_time; + } } } // TODO need to be improved @@ -732,9 +748,15 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_output_rows_counter, _number_output_rows); COUNTER_SET(_filtered_rows_counter, _number_filtered_rows); COUNTER_SET(_send_data_timer, _send_data_ns); + COUNTER_SET(_wait_mem_limit_timer, mem_exceeded_block_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); + COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns); + COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns); + COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns); + COUNTER_SET(_add_batch_number, total_add_batch_num); + COUNTER_SET(_num_node_channels, num_node_channels); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected(); @@ -744,11 +766,10 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; ss << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; + << ", txn_id=" << _txn_id << ", node add batch time(ms)/num: "; for (auto const& pair : node_add_batch_counter_map) { ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) - << ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) << ")(" - << pair.second.add_batch_num << ")} "; + << ")(" << pair.second.add_batch_num << ")} "; } LOG(INFO) << ss.str(); } else { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 8d894c6da711da..4a4220278e4529 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -80,7 +80,8 @@ struct AddBatchCounter { template class ReusableClosure : public google::protobuf::Closure { public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} + ReusableClosure() : cid(INVALID_BTHREAD_ID) { + } ~ReusableClosure() { // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. join(); @@ -173,12 +174,17 @@ class NodeChannel { void time_report(std::unordered_map* add_batch_counter_map, int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns, - int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) { + int64_t* queue_push_lock_ns, int64_t* actual_consume_ns, + int64_t* total_add_batch_exec_time_ns, int64_t* add_batch_exec_time_ns, + int64_t* total_add_batch_num) { (*add_batch_counter_map)[_node_id] += _add_batch_counter; *serialize_batch_ns += _serialize_batch_ns; *mem_exceeded_block_ns += _mem_exceeded_block_ns; *queue_push_lock_ns += _queue_push_lock_ns; *actual_consume_ns += _actual_consume_ns; + *add_batch_exec_time_ns = (_add_batch_counter.add_batch_execution_time_us * 1000); + *total_add_batch_exec_time_ns += *add_batch_exec_time_ns; + *total_add_batch_num += _add_batch_counter.add_batch_num; } int64_t node_id() const { return _node_id; } @@ -237,10 +243,10 @@ class NodeChannel { std::vector _tablet_commit_infos; AddBatchCounter _add_batch_counter; - std::atomic _serialize_batch_ns; - std::atomic _mem_exceeded_block_ns; - std::atomic _queue_push_lock_ns; - std::atomic _actual_consume_ns; + std::atomic _serialize_batch_ns{0}; + std::atomic _mem_exceeded_block_ns{0}; + std::atomic _queue_push_lock_ns{0}; + std::atomic _actual_consume_ns{0}; }; class IndexChannel { @@ -262,6 +268,8 @@ class IndexChannel { void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); } bool has_intolerable_failure(); + size_t num_node_channels() const { return _node_channels.size(); } + private: OlapTableSink* _parent; int64_t _index_id; @@ -382,12 +390,18 @@ class OlapTableSink : public DataSink { RuntimeProfile::Counter* _output_rows_counter = nullptr; RuntimeProfile::Counter* _filtered_rows_counter = nullptr; RuntimeProfile::Counter* _send_data_timer = nullptr; + RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; RuntimeProfile::Counter* _convert_batch_timer = nullptr; RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _non_blocking_send_timer = nullptr; + RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; + RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr; + RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr; + RuntimeProfile::Counter* _add_batch_number = nullptr; + RuntimeProfile::Counter* _num_node_channels = nullptr; // load mem limit is for remote load channel int64_t _load_mem_limit = -1; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1ed44fbf9f1b7b..ee80db1e2d5a00 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -176,10 +176,17 @@ OLAPStatus DeltaWriter::init() { } OLAPStatus DeltaWriter::write(Tuple* tuple) { - if (!_is_init) { + std::lock_guard l(_lock); + if (!_is_init && !_is_cancelled) { RETURN_NOT_OK(init()); } + if (_is_cancelled) { + // The writer may be cancelled at any time by other thread. + // just return ERROR if writer is cancelled. + return OLAP_ERR_ALREADY_CANCELLED; + } + _mem_table->insert(tuple); // if memtable is full, push it to the flush executor, @@ -196,7 +203,20 @@ OLAPStatus DeltaWriter::_flush_memtable_async() { return _flush_token->submit(_mem_table); } -OLAPStatus DeltaWriter::flush_memtable_and_wait() { +OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) { + std::lock_guard l(_lock); + if (!_is_init) { + // This writer is not initialized before flushing. Do nothing + // But we return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED, + // Because this method maybe called when trying to reduce mem consumption, + // and at that time, the writer may not be initialized yet and that is a normal case. + return OLAP_SUCCESS; + } + + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + if (mem_consumption() == _mem_table->memory_usage()) { // equal means there is no memtable in flush queue, just flush this memtable VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " @@ -208,7 +228,24 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() { DCHECK(mem_consumption() > _mem_table->memory_usage()); // this means there should be at least one memtable in flush queue. } - // wait all memtables in flush queue to be flushed. + + if (need_wait) { + // wait all memtables in flush queue to be flushed. + RETURN_NOT_OK(_flush_token->wait()); + } + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::wait_flush() { + std::lock_guard l(_lock); + if (!_is_init) { + // return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED for same reason + // as described in flush_memtable_and_wait() + return OLAP_SUCCESS; + } + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } RETURN_NOT_OK(_flush_token->wait()); return OLAP_SUCCESS; } @@ -220,7 +257,8 @@ void DeltaWriter::_reset_mem_table() { } OLAPStatus DeltaWriter::close() { - if (!_is_init) { + std::lock_guard l(_lock); + if (!_is_init && !_is_cancelled) { // if this delta writer is not initialized, but close() is called. // which means this tablet has no data loaded, but at least one tablet // in same partition has data loaded. @@ -229,14 +267,24 @@ OLAPStatus DeltaWriter::close() { RETURN_NOT_OK(init()); } + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + RETURN_NOT_OK(_flush_memtable_async()); _mem_table.reset(); return OLAP_SUCCESS; } OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec) { + std::lock_guard l(_lock); DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called"; + + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + // return error if previous flush failed RETURN_NOT_OK(_flush_token->wait()); DCHECK_EQ(_mem_tracker->consumption(), 0); @@ -295,7 +343,8 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField l(_lock); + if (!_is_init || _is_cancelled) { return OLAP_SUCCESS; } _mem_table.reset(); @@ -304,6 +353,7 @@ OLAPStatus DeltaWriter::cancel() { _flush_token->cancel(); } DCHECK_EQ(_mem_tracker->consumption(), 0); + _is_cancelled = true; return OLAP_SUCCESS; } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index caa0483801ac3c..9cf59eb50609c3 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -21,6 +21,7 @@ #include "gen_cpp/internal_service.pb.h" #include "olap/rowset/rowset_writer.h" #include "olap/tablet.h" +#include "util/spinlock.h" namespace doris { @@ -73,12 +74,17 @@ class DeltaWriter { // submit current memtable to flush queue, and wait all memtables in flush queue // to be flushed. // This is currently for reducing mem consumption of this delta writer. - OLAPStatus flush_memtable_and_wait(); + // If need_wait is true, it will wait for all memtable in flush queue to be flushed. + // Otherwise, it will just put memtables to the flush queue and return. + OLAPStatus flush_memtable_and_wait(bool need_wait); int64_t partition_id() const; int64_t mem_consumption() const; + // Wait all memtable in flush queue to be flushed + OLAPStatus wait_flush(); + private: DeltaWriter(WriteRequest* req, const std::shared_ptr& parent, StorageEngine* storage_engine); @@ -92,6 +98,7 @@ class DeltaWriter { private: bool _is_init = false; + bool _is_cancelled = false; WriteRequest _req; TabletSharedPtr _tablet; RowsetSharedPtr _cur_rowset; @@ -106,6 +113,8 @@ class DeltaWriter { StorageEngine* _storage_engine; std::unique_ptr _flush_token; std::shared_ptr _mem_tracker; + + SpinLock _lock; }; } // namespace doris diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 75ad835afa8d17..ef1ec2efce8780 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -165,6 +165,8 @@ enum OLAPStatus { OLAP_ERR_TOO_MANY_TRANSACTIONS = -233, OLAP_ERR_INVALID_SNAPSHOT_VERSION = -234, OLAP_ERR_TOO_MANY_VERSION = -235, + OLAP_ERR_NOT_INITIALIZED = -236, + OLAP_ERR_ALREADY_CANCELLED = -237, // CommandExecutor // [-300, -400) diff --git a/be/src/olap/rowset/unique_rowset_id_generator.cpp b/be/src/olap/rowset/unique_rowset_id_generator.cpp index 71352ca848c58e..c21b8caf2f54da 100644 --- a/be/src/olap/rowset/unique_rowset_id_generator.cpp +++ b/be/src/olap/rowset/unique_rowset_id_generator.cpp @@ -19,6 +19,7 @@ #include "util/doris_metrics.h" #include "util/spinlock.h" +#include "util/stack_util.h" #include "util/uid_util.h" namespace doris { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 5a2fb5a9093aa3..18ef1132769cab 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -632,6 +632,7 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, TTabletId* tablet_id, TSchemaHash* schema_hash) { + // the path like: /data/14/10080/964828783/ static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)"); // match tablet schema hash data path, for example, the path is /data/1/16791/29998 // 1 is shard id , 16791 is tablet id, 29998 is schema hash @@ -651,6 +652,7 @@ bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, } bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) { + // the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*"); string id_str; bool ret = RE2::PartialMatch(path, re, &id_str); diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index c2feebb2f38951..2f320d0c3175f4 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -117,8 +117,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { static void dummy_deleter(const CacheKey& key, void* value) {} Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request, - google::protobuf::RepeatedPtrField* tablet_vec, - int64_t* wait_lock_time_ns) { + google::protobuf::RepeatedPtrField* tablet_vec) { UniqueId load_id(request.id()); // 1. get load channel std::shared_ptr channel; diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index f0ec6fab0738e0..450c8bf0e2d451 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -51,8 +51,7 @@ class LoadChannelMgr { Status open(const PTabletWriterOpenRequest& request); Status add_batch(const PTabletWriterAddBatchRequest& request, - google::protobuf::RepeatedPtrField* tablet_vec, - int64_t* wait_lock_time_ns); + google::protobuf::RepeatedPtrField* tablet_vec); // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 76c7166d827304..b2a32c0a925eb9 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -77,23 +77,26 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { DCHECK(params.tablet_ids_size() == params.row_batch().num_rows()); - std::lock_guard l(_lock); - if (_state != kOpened) { - return _state == kFinished - ? _close_status - : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", - _key.to_string(), _state)); - } - auto next_seq = _next_seqs[params.sender_id()]; - // check packet - if (params.packet_seq() < next_seq) { - LOG(INFO) << "packet has already recept before, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::OK(); - } else if (params.packet_seq() > next_seq) { - LOG(WARNING) << "lost data packet, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::InternalError("lost data packet"); + int64_t cur_seq; + { + std::lock_guard l(_lock); + if (_state != kOpened) { + return _state == kFinished + ? _close_status + : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", + _key.to_string(), _state)); + } + cur_seq = _next_seqs[params.sender_id()]; + // check packet + if (params.packet_seq() < cur_seq) { + LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq + << ", recept_seq=" << params.packet_seq(); + return Status::OK(); + } else if (params.packet_seq() > cur_seq) { + LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq + << ", recept_seq=" << params.packet_seq(); + return Status::InternalError("lost data packet"); + } } RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get()); @@ -115,7 +118,11 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { return Status::InternalError(err_msg); } } - _next_seqs[params.sender_id()]++; + + { + std::lock_guard l(_lock); + _next_seqs[params.sender_id()] = cur_seq + 1; + } return Status::OK(); } @@ -183,29 +190,20 @@ Status TabletsChannel::reduce_mem_usage() { // therefore it's possible for reduce_mem_usage() to be called right after close() return _close_status; } - // find tablet writer with largest mem consumption - int64_t max_consume = 0L; - DeltaWriter* writer = nullptr; - for (auto& it : _tablet_writers) { - if (it.second->mem_consumption() > max_consume) { - max_consume = it.second->mem_consumption(); - writer = it.second; - } - } - if (writer == nullptr || max_consume == 0) { - // barely not happend, just return OK - return Status::OK(); + // Flush all memtables + for (auto& it : _tablet_writers) { + it.second->flush_memtable_and_wait(false); } - VLOG_NOTICE << "pick the delte writer to flush, with mem consumption: " << max_consume - << ", channel key: " << _key; - OLAPStatus st = writer->flush_memtable_and_wait(); - if (st != OLAP_SUCCESS) { - // flush failed, return error - std::stringstream ss; - ss << "failed to reduce mem consumption by flushing memtable. err: " << st; - return Status::InternalError(ss.str()); + for (auto& it : _tablet_writers) { + OLAPStatus st = it.second->wait_flush(); + if (st != OLAP_SUCCESS) { + // flush failed, return error + std::stringstream ss; + ss << "failed to reduce mem consumption by flushing memtable. err: " << st; + return Status::InternalError(ss.str()); + } } return Status::OK(); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 7c85654600c774..51480fbb038116 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -97,11 +97,10 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr _tablet_worker_pool.offer([request, response, done, this]() { brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; - int64_t wait_lock_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); auto st = _exec_env->load_channel_mgr()->add_batch( - *request, response->mutable_tablet_vec(), &wait_lock_time_ns); + *request, response->mutable_tablet_vec()); if (!st.ok()) { LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg() << ", id=" << request->id() << ", index_id=" << request->index_id() @@ -110,7 +109,6 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr st.to_protobuf(response->mutable_status()); } response->set_execution_time_us(execution_time_ns / 1000); - response->set_wait_lock_time_us(wait_lock_time_ns / 1000); }); } diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp index 54b0d3fedae5ce..80e03adb5820a4 100644 --- a/be/test/runtime/load_channel_mgr_test.cpp +++ b/be/test/runtime/load_channel_mgr_test.cpp @@ -85,7 +85,11 @@ OLAPStatus DeltaWriter::cancel() { return OLAP_SUCCESS; } -OLAPStatus DeltaWriter::flush_memtable_and_wait() { +OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) { + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::wait_flush() { return OLAP_SUCCESS; } @@ -246,7 +250,7 @@ TEST_F(LoadChannelMgrTest, normal) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_TRUE(st.ok()); } @@ -413,7 +417,7 @@ TEST_F(LoadChannelMgrTest, add_failed) { row_batch.serialize(request.mutable_row_batch()); add_status = OLAP_ERR_TABLE_NOT_FOUND; google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_FALSE(st.ok()); } @@ -503,7 +507,7 @@ TEST_F(LoadChannelMgrTest, close_failed) { row_batch.serialize(request.mutable_row_batch()); close_status = OLAP_ERR_TABLE_NOT_FOUND; google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); // even if delta close failed, the return status is still ok, but tablet_vec is empty ASSERT_TRUE(st.ok()); @@ -591,7 +595,7 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_FALSE(st.ok()); } @@ -677,10 +681,10 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec1; - auto st = mgr.add_batch(request, &tablet_vec1, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec1); ASSERT_TRUE(st.ok()); google::protobuf::RepeatedPtrField tablet_vec2; - st = mgr.add_batch(request, &tablet_vec2, &wait_lock_time_ns); + st = mgr.add_batch(request, &tablet_vec2); request.release_id(); ASSERT_TRUE(st.ok()); } @@ -693,7 +697,7 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) { request.set_eos(true); request.set_packet_seq(0); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_TRUE(st.ok()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c0fba828b7413a..64295d594923dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1069,6 +1069,8 @@ private void computeFragmentHosts() throws Exception { List> perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum); + LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size()); + for (List scanRangeParams : perInstanceScanRanges) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); @@ -1085,7 +1087,7 @@ private void computeFragmentHosts() throws Exception { throw new UserException("there is no scanNode Backend"); } this.addressToBackendID.put(execHostport, backendIdRef.getRef()); - FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, 0, params); params.instanceExecParams.add(instanceParam); }