From db99e6ca32ff2ed31eba56c800d8df135a726ba6 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 9 Feb 2022 23:55:52 +0800 Subject: [PATCH 1/8] [fix](compatibility) Fix compatibility issue of PRowBatch 1. set both tuple_offsets and new_tuple_offsets in PRowBatch for compatibility 2. set FE config `repair_slow_replica` default to false Avoid impacting the load process after upgrading. Eg, if there are only 2 replicas, one is with high version count. After upgrade, that replica will be set to bad, so that the load process will be stopped because only 1 replica is alive. --- be/src/runtime/row_batch.cpp | 12 ++++++++++-- docs/en/administrator-guide/config/fe_config.md | 4 ++-- docs/zh-CN/administrator-guide/config/fe_config.md | 2 +- .../main/java/org/apache/doris/common/Config.java | 2 +- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 139740b9c3786e..eb223d661d5d7e 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -254,7 +254,10 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s // row_tuples _row_desc.to_protobuf(output_batch->mutable_row_tuples()); // tuple_offsets: must clear before reserve + // TODO(cmy): the tuple_offsets should be removed after v1.1.0, use new_tuple_offsets instead. + // keep tuple_offsets here is just for compatibility. output_batch->clear_tuple_offsets(); + output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row); output_batch->clear_new_tuple_offsets(); output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row); // is_compressed @@ -279,7 +282,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s int64_t offset = 0; // current offset into output_batch->tuple_data char* tuple_data = mutable_tuple_data->data(); const auto& tuple_descs = _row_desc.tuple_descriptors(); - const auto& mutable_tuple_offsets = output_batch->mutable_new_tuple_offsets(); + const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets(); + const auto& mutable_new_tuple_offsets = output_batch->mutable_new_tuple_offsets(); for (int i = 0; i < _num_rows; ++i) { TupleRow* row = get_row(i); @@ -288,10 +292,12 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s if (row->get_tuple(j) == nullptr) { // NULLs are encoded as -1 mutable_tuple_offsets->Add(-1); + mutable_new_tuple_offsets->Add(-1); continue; } // Record offset before creating copy (which increments offset and tuple_data) - mutable_tuple_offsets->Add(offset); + mutable_tuple_offsets->Add((int32_t) offset); + mutable_new_tuple_offsets->Add(offset); row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true); CHECK_LE(offset, size); } @@ -519,7 +525,9 @@ vectorized::Block RowBatch::convert_to_vec_block() const { size_t RowBatch::get_batch_size(const PRowBatch& batch) { size_t result = batch.tuple_data().size(); result += batch.row_tuples().size() * sizeof(int32_t); + // TODO(cmy): remove batch.tuple_offsets result += batch.tuple_offsets().size() * sizeof(int32_t); + result += batch.new_tuple_offsets().size() * sizeof(int64_t); return result; } diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 33eaf223847f07..0d679dcb61fd37 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -2096,7 +2096,7 @@ When there are a large number of replicas waiting to be balanced or repaired in ### repair_slow_replica -Default: true +Default: false IsMutable:true @@ -2153,4 +2153,4 @@ Dynamically configured: true Only for Master FE: true -The data size threshold used to judge whether replica is too large \ No newline at end of file +The data size threshold used to judge whether replica is too large diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 6e73f402530da7..0e3c5191909a44 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -2116,7 +2116,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 ### repair_slow_replica -默认值:true +默认值:false 是否可以动态配置:true diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 21a84308804b1a..b25b5f35b916c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1572,7 +1572,7 @@ public class Config extends ConfigBase { * auto set the slowest compaction replica's status to bad */ @ConfField(mutable = true, masterOnly = true) - public static boolean repair_slow_replica = true; + public static boolean repair_slow_replica = false; /* * The relocation of a colocation group may involve a large number of tablets moving within the cluster. From c456453d1f8f0e1980be31f6bd22d18f6e2dea19 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 10 Feb 2022 19:28:55 +0800 Subject: [PATCH 2/8] change log --- be/src/olap/tablet_meta.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 249e554fd7a06c..6e6195d951e23b 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -601,8 +601,8 @@ void TabletMeta::remove_delete_predicate_by_version(const Version& version) { for (const auto& it : temp.sub_predicates()) { del_cond_str += it + ";"; } - LOG(INFO) << "remove one del_pred. version=" << temp.version() - << ", condition=" << del_cond_str; + VLOG_NOTICE << "remove one del_pred. version=" << temp.version() + << ", condition=" << del_cond_str; // remove delete condition from PB _del_pred_array.SwapElements(ordinal, _del_pred_array.size() - 1); From 3aa0017f6411650a14002c39712e940ac3b726e9 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 10 Feb 2022 19:49:09 +0800 Subject: [PATCH 3/8] fix routine load npe --- .../load/routineload/RoutineLoadManager.java | 52 +++++++++---------- .../org/apache/doris/qe/QeProcessorImpl.java | 2 +- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index f97d25e9b21af9..ae6d8b3c6d4a02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -123,14 +123,14 @@ public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) throws UserException { // check load auth if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - createRoutineLoadStmt.getDBName(), - createRoutineLoadStmt.getTableName(), - PrivPredicate.LOAD)) { + createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getTableName(), + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - createRoutineLoadStmt.getDBName(), - createRoutineLoadStmt.getDBName() + ": " + createRoutineLoadStmt.getTableName()); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getDBName() + ": " + createRoutineLoadStmt.getTableName()); } RoutineLoadJob routineLoadJob = null; @@ -220,13 +220,13 @@ public RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName) throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - dbFullName, - tableName, - PrivPredicate.LOAD)) { + dbFullName, + tableName, + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - dbFullName + ": " + tableName); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + dbFullName + ": " + tableName); } return routineLoadJob; } @@ -336,10 +336,10 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"), false /* not replay */); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("current_state", routineLoadJob.getState()) - .add("user", ConnectContext.get().getQualifiedUser()) - .add("msg", "routine load job has been stopped by user") - .build()); + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been stopped by user") + .build()); } public int getSizeOfIdToRoutineLoadTask() { @@ -394,7 +394,7 @@ public long getMinTaskBeId(String clusterName) throws LoadException { } if (LOG.isDebugEnabled()) { LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum, - beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId)); + beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId)); } result = maxIdleSlotNum < idleTaskNum ? beId : result; maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); @@ -426,7 +426,7 @@ public long getAvailableBeForTask(long previoudBeId, String clusterName) throws // 1. Find if the given BE id has available slots if (previoudBeId != -1L) { int idleTaskNum = 0; - if (beIdToMaxConcurrentTasks.containsKey(previoudBeId)) { + if (!beIdToMaxConcurrentTasks.containsKey(previoudBeId)) { idleTaskNum = 0; } else if (beIdToConcurrentTasks.containsKey(previoudBeId)) { idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId); @@ -594,7 +594,7 @@ public void cleanOldRoutineLoadJobs() { iterator.remove(); RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(), - routineLoadJob.getState()); + routineLoadJob.getState()); Catalog.getCurrentCatalog().getEditLog().logRemoveRoutineLoadJob(operation); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) .add("end_timestamp", routineLoadJob.getEndTimestamp()) @@ -643,8 +643,8 @@ public void updateRoutineLoadJob() throws UserException { public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { unprotectedAddJob(routineLoadJob); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("msg", "replay create routine load job") - .build()); + .add("msg", "replay create routine load job") + .build()); } public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { @@ -655,11 +655,11 @@ public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { LOG.error("should not happened", e); } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId()) - .add("current_state", operation.getJobState()) - .add("msg", "replay change routine load job") - .build()); + .add("current_state", operation.getJobState()) + .add("msg", "replay change routine load job") + .build()); } - + /** * Enter of altering a routine load job */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 449aa7fd63c4da..e3c9feabad9c64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -184,7 +184,7 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); - LOG.info("ReportExecStatus() runtime error, query {} does not exist", params.query_id); + LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id)); return result; } try { From 64fd00758529aeb7d90bef41642eb81cb159fa85 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 11 Feb 2022 00:56:39 +0800 Subject: [PATCH 4/8] fix rpc block --- be/src/exec/tablet_sink.cpp | 24 +++++++++++++++--------- be/src/exec/tablet_sink.h | 12 ++++++------ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 2fe349294dd044..03f27a961ca500 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -183,12 +183,16 @@ Status NodeChannel::open_wait() { // add batch closure _add_batch_closure = ReusableClosure::create(); - _add_batch_closure->addFailedHandler([this]() { + _add_batch_closure->addFailedHandler([this](bool is_last_rpc) { // If rpc failed, mark all tablets on this node channel as failed _index_channel->mark_as_failed(this, _add_batch_closure->cntl.ErrorText(), -1); Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); + } else if (is_last_rpc) { + // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait + // will be blocked. + _add_batches_finished = true; } }); @@ -493,7 +497,7 @@ void NodeChannel::try_send_batch() { // eos request must be the last request _add_batch_closure->end_mark(); _send_finished = true; - DCHECK(_pending_batches_num == 0); + CHECK(_pending_batches_num == 0) << _pending_batches_num; } if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) { @@ -541,13 +545,15 @@ Status IndexChannel::init(RuntimeState* state, const std::vector channels; + std::vector> channels; for (auto& node_id : location->node_ids) { - NodeChannel* channel = nullptr; + std::shared_ptr channel; auto it = _node_channels.find(node_id); if (it == _node_channels.end()) { - channel = - _parent->_pool->add(new NodeChannel(_parent, this, node_id, _schema_hash)); + // NodeChannel is not added to the _parent->_pool. + // Because the deconstruction of NodeChannel may take a long time to wait rpc finish. + // but the ObjectPool will hold a spin lock to delete objects. + channel = std::make_shared(_parent, this, node_id, _schema_hash); _node_channels.emplace(node_id, channel); } else { channel = it->second; @@ -571,7 +577,7 @@ void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) { // if this node channel is already failed, this add_row will be skipped auto st = channel->add_row(tuple, tablet_id); if (!st.ok()) { - mark_as_failed(channel, st.get_error_msg(), tablet_id); + mark_as_failed(channel.get(), st.get_error_msg(), tablet_id); // continue add row to other node, the error will be checked for every batch outside } } @@ -586,7 +592,7 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) { // if this node channel is already failed, this add_row will be skipped auto st = channel->add_row(block_row, tablet_id); if (!st.ok()) { - mark_as_failed(channel, st.get_error_msg(), tablet_id); + mark_as_failed(channel.get(), st.get_error_msg(), tablet_id); } } } @@ -1216,7 +1222,7 @@ void OlapTableSink::_send_batch_process() { if (running_channels_num == 0) { LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), " - "consumer thread exit."; + "sender thread exit. " << print_id(_load_id); return; } } while (!_stop_background_threads_latch.wait_for( diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 10655882a2e6e2..37d1caf39b80d5 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -92,7 +92,7 @@ class ReusableClosure : public google::protobuf::Closure { static ReusableClosure* create() { return new ReusableClosure(); } - void addFailedHandler(std::function fn) { failed_handler = fn; } + void addFailedHandler(std::function fn) { failed_handler = fn; } void addSuccessHandler(std::function fn) { success_handler = fn; } void join() { @@ -126,7 +126,7 @@ class ReusableClosure : public google::protobuf::Closure { if (cntl.Failed()) { LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode()) << ", error_text=" << cntl.ErrorText(); - failed_handler(); + failed_handler(_is_last_rpc); } else { success_handler(result, _is_last_rpc); } @@ -140,7 +140,7 @@ class ReusableClosure : public google::protobuf::Closure { brpc::CallId cid; std::atomic _packet_in_flight {false}; std::atomic _is_last_rpc {false}; - std::function failed_handler; + std::function failed_handler; std::function success_handler; }; @@ -290,7 +290,7 @@ class IndexChannel { void for_each_node_channel(const std::function& func) { for (auto& it : _node_channels) { - func(it.second); + func(it.second.get()); } } @@ -310,9 +310,9 @@ class IndexChannel { int32_t _schema_hash; // BeId -> channel - std::unordered_map _node_channels; + std::unordered_map> _node_channels; // from tablet_id to backend channel - std::unordered_map> _channels_by_tablet; + std::unordered_map>> _channels_by_tablet; // from backend channel to tablet_id std::unordered_map> _tablets_by_channel; From a96a694c602f16b40b113dd86e680924ba3bfbd5 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 11 Feb 2022 10:06:34 +0800 Subject: [PATCH 5/8] fix ut --- be/test/vec/aggregate_functions/agg_test.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/test/vec/aggregate_functions/agg_test.cpp b/be/test/vec/aggregate_functions/agg_test.cpp index b0d3c0656415ab..9e63257e4bcfea 100644 --- a/be/test/vec/aggregate_functions/agg_test.cpp +++ b/be/test/vec/aggregate_functions/agg_test.cpp @@ -95,6 +95,9 @@ TEST(AggTest, topn_test) { "46,\"10\":37}"; ASSERT_EQ(result, expect_result); agg_function->destroy(place); + if (place) { + delete[] place; + } } } // namespace doris::vectorized From 6c23f246f60866e75cd3d3f9715ac0b3177ca948 Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 12 Feb 2022 10:15:39 +0800 Subject: [PATCH 6/8] fix --- be/src/exec/tablet_sink.cpp | 38 ++++++++++++++++++------------------- be/src/exec/tablet_sink.h | 6 +++--- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 03f27a961ca500..590e1136d921dc 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -185,7 +185,7 @@ Status NodeChannel::open_wait() { _add_batch_closure = ReusableClosure::create(); _add_batch_closure->addFailedHandler([this](bool is_last_rpc) { // If rpc failed, mark all tablets on this node channel as failed - _index_channel->mark_as_failed(this, _add_batch_closure->cntl.ErrorText(), -1); + _index_channel->mark_as_failed(this->node_id(), this->host(), _add_batch_closure->cntl.ErrorText(), -1); Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); @@ -202,7 +202,7 @@ Status NodeChannel::open_wait() { if (status.ok()) { // if has error tablet, handle them first for (auto& error : result.tablet_errors()) { - _index_channel->mark_as_failed(this, error.msg(), error.tablet_id()); + _index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(), error.tablet_id()); } Status st = _index_channel->check_intolerable_failure(); @@ -577,7 +577,7 @@ void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) { // if this node channel is already failed, this add_row will be skipped auto st = channel->add_row(tuple, tablet_id); if (!st.ok()) { - mark_as_failed(channel.get(), st.get_error_msg(), tablet_id); + mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id); // continue add row to other node, the error will be checked for every batch outside } } @@ -592,14 +592,14 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) { // if this node channel is already failed, this add_row will be skipped auto st = channel->add_row(block_row, tablet_id); if (!st.ok()) { - mark_as_failed(channel.get(), st.get_error_msg(), tablet_id); + mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id); } } } -void IndexChannel::mark_as_failed(const NodeChannel* ch, const std::string& err, +void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id) { - const auto& it = _tablets_by_channel.find(ch->node_id()); + const auto& it = _tablets_by_channel.find(node_id); if (it == _tablets_by_channel.end()) { return; } @@ -608,16 +608,16 @@ void IndexChannel::mark_as_failed(const NodeChannel* ch, const std::string& err, std::lock_guard l(_fail_lock); if (tablet_id == -1) { for (const auto the_tablet_id : it->second) { - _failed_channels[the_tablet_id].insert(ch->node_id()); - _failed_channels_msgs.emplace(the_tablet_id, err + ", host: " + ch->host()); + _failed_channels[the_tablet_id].insert(node_id); + _failed_channels_msgs.emplace(the_tablet_id, err + ", host: " + host); if (_failed_channels[the_tablet_id].size() >= ((_parent->_num_replicas + 1) / 2)) { _intolerable_failure_status = Status::InternalError(_failed_channels_msgs[the_tablet_id]); } } } else { - _failed_channels[tablet_id].insert(ch->node_id()); - _failed_channels_msgs.emplace(tablet_id, err + ", host: " + ch->host()); + _failed_channels[tablet_id].insert(node_id); + _failed_channels_msgs.emplace(tablet_id, err + ", host: " + host); if (_failed_channels[tablet_id].size() >= ((_parent->_num_replicas + 1) / 2)) { _intolerable_failure_status = Status::InternalError(_failed_channels_msgs[tablet_id]); @@ -661,7 +661,7 @@ OlapTableSink::~OlapTableSink() { // OlapTableSink::_mem_tracker and its parents. // But their destructions are after OlapTableSink's. for (auto index_channel : _channels) { - index_channel->for_each_node_channel([](NodeChannel* ch) { ch->clear_all_batches(); }); + index_channel->for_each_node_channel([](const std::shared_ptr& ch) { ch->clear_all_batches(); }); } } @@ -815,17 +815,17 @@ Status OlapTableSink::open(RuntimeState* state) { RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state)); for (auto index_channel : _channels) { - index_channel->for_each_node_channel([](NodeChannel* ch) { ch->open(); }); + index_channel->for_each_node_channel([](const std::shared_ptr& ch) { ch->open(); }); } for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&index_channel](NodeChannel* ch) { + index_channel->for_each_node_channel([&index_channel](const std::shared_ptr& ch) { auto st = ch->open_wait(); if (!st.ok()) { // The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel. // This phase will not fail due to a single tablet. // Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed. - index_channel->mark_as_failed(ch, + index_channel->mark_as_failed(ch->node_id(), ch->host(), fmt::format("{}, open failed, err: {}", ch->channel_info(), st.get_error_msg()), -1); @@ -941,7 +941,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { { SCOPED_TIMER(_close_timer); for (auto index_channel : _channels) { - index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); }); + index_channel->for_each_node_channel([](const std::shared_ptr& ch) { ch->mark_close(); }); num_node_channels += index_channel->num_node_channels(); } @@ -951,10 +951,10 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { [&index_channel, &state, &node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, &queue_push_lock_ns, &actual_consume_ns, &total_add_batch_exec_time_ns, &add_batch_exec_time, - &total_add_batch_num](NodeChannel* ch) { + &total_add_batch_num](const std::shared_ptr& ch) { auto s = ch->close_wait(state); if (!s.ok()) { - index_channel->mark_as_failed(ch, s.get_error_msg(), -1); + index_channel->mark_as_failed(ch->node_id(), ch->host(), s.get_error_msg(), -1); LOG(WARNING) << ch->channel_info() << ", close channel failed, err: " << s.get_error_msg(); @@ -1013,7 +1013,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { } else { for (auto channel : _channels) { channel->for_each_node_channel( - [&status](NodeChannel* ch) { ch->cancel(status.get_error_msg()); }); + [&status](const std::shared_ptr& ch) { ch->cancel(status.get_error_msg()); }); } } @@ -1214,7 +1214,7 @@ void OlapTableSink::_send_batch_process() { do { int running_channels_num = 0; for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&running_channels_num, this](NodeChannel* ch) { + index_channel->for_each_node_channel([&running_channels_num, this](const std::shared_ptr& ch) { running_channels_num += ch->try_send_and_fetch_status(this->_send_batch_thread_pool_token); }); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 37d1caf39b80d5..b31f3841c337a7 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -288,13 +288,13 @@ class IndexChannel { void add_row(BlockRow& block_row, int64_t tablet_id); - void for_each_node_channel(const std::function& func) { + void for_each_node_channel(const std::function&)>& func) { for (auto& it : _node_channels) { - func(it.second.get()); + func(it.second); } } - void mark_as_failed(const NodeChannel* ch, const std::string& err, int64_t tablet_id = -1); + void mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id = -1); Status check_intolerable_failure(); // set error tablet info in runtime state, so that it can be returned to FE. From fb21ee0f234a99d876b0e7872dc82c714dbdd5d6 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 14 Feb 2022 09:53:20 +0800 Subject: [PATCH 7/8] rebase --- be/test/vec/aggregate_functions/agg_test.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/be/test/vec/aggregate_functions/agg_test.cpp b/be/test/vec/aggregate_functions/agg_test.cpp index 9e63257e4bcfea..b0d3c0656415ab 100644 --- a/be/test/vec/aggregate_functions/agg_test.cpp +++ b/be/test/vec/aggregate_functions/agg_test.cpp @@ -95,9 +95,6 @@ TEST(AggTest, topn_test) { "46,\"10\":37}"; ASSERT_EQ(result, expect_result); agg_function->destroy(place); - if (place) { - delete[] place; - } } } // namespace doris::vectorized From 667cc211ba645a7266ee64fcc55d5d770895b361 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 14 Feb 2022 10:54:06 +0800 Subject: [PATCH 8/8] fix ut --- .../org/apache/doris/clone/TabletReplicaTooSlowTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index 864d8fb8bfa901..875ffe1ab7a3a6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -20,14 +20,11 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; @@ -52,8 +49,6 @@ import java.util.Random; import java.util.UUID; -import mockit.Mocked; - public class TabletReplicaTooSlowTest { private static final Logger LOG = LogManager.getLogger(TabletReplicaTooSlowTest.class); // use a unique dir so that it won't be conflict with other unit test which @@ -78,6 +73,7 @@ public static void beforeClass() throws Exception { FeConstants.runningUnitTest = true; FeConstants.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; + Config.repair_slow_replica = true; // 5 backends: // 127.0.0.1 // 127.0.0.2