diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 2fe349294dd044..590e1136d921dc 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -183,12 +183,16 @@ Status NodeChannel::open_wait() { // add batch closure _add_batch_closure = ReusableClosure::create(); - _add_batch_closure->addFailedHandler([this]() { + _add_batch_closure->addFailedHandler([this](bool is_last_rpc) { // If rpc failed, mark all tablets on this node channel as failed - _index_channel->mark_as_failed(this, _add_batch_closure->cntl.ErrorText(), -1); + _index_channel->mark_as_failed(this->node_id(), this->host(), _add_batch_closure->cntl.ErrorText(), -1); Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); + } else if (is_last_rpc) { + // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait + // will be blocked. + _add_batches_finished = true; } }); @@ -198,7 +202,7 @@ Status NodeChannel::open_wait() { if (status.ok()) { // if has error tablet, handle them first for (auto& error : result.tablet_errors()) { - _index_channel->mark_as_failed(this, error.msg(), error.tablet_id()); + _index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(), error.tablet_id()); } Status st = _index_channel->check_intolerable_failure(); @@ -493,7 +497,7 @@ void NodeChannel::try_send_batch() { // eos request must be the last request _add_batch_closure->end_mark(); _send_finished = true; - DCHECK(_pending_batches_num == 0); + CHECK(_pending_batches_num == 0) << _pending_batches_num; } if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) { @@ -541,13 +545,15 @@ Status IndexChannel::init(RuntimeState* state, const std::vector channels; + std::vector> channels; for (auto& node_id : location->node_ids) { - NodeChannel* channel = nullptr; + std::shared_ptr channel; auto it = _node_channels.find(node_id); if (it == _node_channels.end()) { - channel = - _parent->_pool->add(new NodeChannel(_parent, this, node_id, _schema_hash)); + // NodeChannel is not added to the _parent->_pool. + // Because the deconstruction of NodeChannel may take a long time to wait rpc finish. + // but the ObjectPool will hold a spin lock to delete objects. + channel = std::make_shared(_parent, this, node_id, _schema_hash); _node_channels.emplace(node_id, channel); } else { channel = it->second; @@ -571,7 +577,7 @@ void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) { // if this node channel is already failed, this add_row will be skipped auto st = channel->add_row(tuple, tablet_id); if (!st.ok()) { - mark_as_failed(channel, st.get_error_msg(), tablet_id); + mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id); // continue add row to other node, the error will be checked for every batch outside } } @@ -586,14 +592,14 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) { // if this node channel is already failed, this add_row will be skipped auto st = channel->add_row(block_row, tablet_id); if (!st.ok()) { - mark_as_failed(channel, st.get_error_msg(), tablet_id); + mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id); } } } -void IndexChannel::mark_as_failed(const NodeChannel* ch, const std::string& err, +void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id) { - const auto& it = _tablets_by_channel.find(ch->node_id()); + const auto& it = _tablets_by_channel.find(node_id); if (it == _tablets_by_channel.end()) { return; } @@ -602,16 +608,16 @@ void IndexChannel::mark_as_failed(const NodeChannel* ch, const std::string& err, std::lock_guard l(_fail_lock); if (tablet_id == -1) { for (const auto the_tablet_id : it->second) { - _failed_channels[the_tablet_id].insert(ch->node_id()); - _failed_channels_msgs.emplace(the_tablet_id, err + ", host: " + ch->host()); + _failed_channels[the_tablet_id].insert(node_id); + _failed_channels_msgs.emplace(the_tablet_id, err + ", host: " + host); if (_failed_channels[the_tablet_id].size() >= ((_parent->_num_replicas + 1) / 2)) { _intolerable_failure_status = Status::InternalError(_failed_channels_msgs[the_tablet_id]); } } } else { - _failed_channels[tablet_id].insert(ch->node_id()); - _failed_channels_msgs.emplace(tablet_id, err + ", host: " + ch->host()); + _failed_channels[tablet_id].insert(node_id); + _failed_channels_msgs.emplace(tablet_id, err + ", host: " + host); if (_failed_channels[tablet_id].size() >= ((_parent->_num_replicas + 1) / 2)) { _intolerable_failure_status = Status::InternalError(_failed_channels_msgs[tablet_id]); @@ -655,7 +661,7 @@ OlapTableSink::~OlapTableSink() { // OlapTableSink::_mem_tracker and its parents. // But their destructions are after OlapTableSink's. for (auto index_channel : _channels) { - index_channel->for_each_node_channel([](NodeChannel* ch) { ch->clear_all_batches(); }); + index_channel->for_each_node_channel([](const std::shared_ptr& ch) { ch->clear_all_batches(); }); } } @@ -809,17 +815,17 @@ Status OlapTableSink::open(RuntimeState* state) { RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state)); for (auto index_channel : _channels) { - index_channel->for_each_node_channel([](NodeChannel* ch) { ch->open(); }); + index_channel->for_each_node_channel([](const std::shared_ptr& ch) { ch->open(); }); } for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&index_channel](NodeChannel* ch) { + index_channel->for_each_node_channel([&index_channel](const std::shared_ptr& ch) { auto st = ch->open_wait(); if (!st.ok()) { // The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel. // This phase will not fail due to a single tablet. // Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed. - index_channel->mark_as_failed(ch, + index_channel->mark_as_failed(ch->node_id(), ch->host(), fmt::format("{}, open failed, err: {}", ch->channel_info(), st.get_error_msg()), -1); @@ -935,7 +941,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { { SCOPED_TIMER(_close_timer); for (auto index_channel : _channels) { - index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); }); + index_channel->for_each_node_channel([](const std::shared_ptr& ch) { ch->mark_close(); }); num_node_channels += index_channel->num_node_channels(); } @@ -945,10 +951,10 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { [&index_channel, &state, &node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, &queue_push_lock_ns, &actual_consume_ns, &total_add_batch_exec_time_ns, &add_batch_exec_time, - &total_add_batch_num](NodeChannel* ch) { + &total_add_batch_num](const std::shared_ptr& ch) { auto s = ch->close_wait(state); if (!s.ok()) { - index_channel->mark_as_failed(ch, s.get_error_msg(), -1); + index_channel->mark_as_failed(ch->node_id(), ch->host(), s.get_error_msg(), -1); LOG(WARNING) << ch->channel_info() << ", close channel failed, err: " << s.get_error_msg(); @@ -1007,7 +1013,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { } else { for (auto channel : _channels) { channel->for_each_node_channel( - [&status](NodeChannel* ch) { ch->cancel(status.get_error_msg()); }); + [&status](const std::shared_ptr& ch) { ch->cancel(status.get_error_msg()); }); } } @@ -1208,7 +1214,7 @@ void OlapTableSink::_send_batch_process() { do { int running_channels_num = 0; for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&running_channels_num, this](NodeChannel* ch) { + index_channel->for_each_node_channel([&running_channels_num, this](const std::shared_ptr& ch) { running_channels_num += ch->try_send_and_fetch_status(this->_send_batch_thread_pool_token); }); @@ -1216,7 +1222,7 @@ void OlapTableSink::_send_batch_process() { if (running_channels_num == 0) { LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), " - "consumer thread exit."; + "sender thread exit. " << print_id(_load_id); return; } } while (!_stop_background_threads_latch.wait_for( diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 10655882a2e6e2..b31f3841c337a7 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -92,7 +92,7 @@ class ReusableClosure : public google::protobuf::Closure { static ReusableClosure* create() { return new ReusableClosure(); } - void addFailedHandler(std::function fn) { failed_handler = fn; } + void addFailedHandler(std::function fn) { failed_handler = fn; } void addSuccessHandler(std::function fn) { success_handler = fn; } void join() { @@ -126,7 +126,7 @@ class ReusableClosure : public google::protobuf::Closure { if (cntl.Failed()) { LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode()) << ", error_text=" << cntl.ErrorText(); - failed_handler(); + failed_handler(_is_last_rpc); } else { success_handler(result, _is_last_rpc); } @@ -140,7 +140,7 @@ class ReusableClosure : public google::protobuf::Closure { brpc::CallId cid; std::atomic _packet_in_flight {false}; std::atomic _is_last_rpc {false}; - std::function failed_handler; + std::function failed_handler; std::function success_handler; }; @@ -288,13 +288,13 @@ class IndexChannel { void add_row(BlockRow& block_row, int64_t tablet_id); - void for_each_node_channel(const std::function& func) { + void for_each_node_channel(const std::function&)>& func) { for (auto& it : _node_channels) { func(it.second); } } - void mark_as_failed(const NodeChannel* ch, const std::string& err, int64_t tablet_id = -1); + void mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id = -1); Status check_intolerable_failure(); // set error tablet info in runtime state, so that it can be returned to FE. @@ -310,9 +310,9 @@ class IndexChannel { int32_t _schema_hash; // BeId -> channel - std::unordered_map _node_channels; + std::unordered_map> _node_channels; // from tablet_id to backend channel - std::unordered_map> _channels_by_tablet; + std::unordered_map>> _channels_by_tablet; // from backend channel to tablet_id std::unordered_map> _tablets_by_channel; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 249e554fd7a06c..6e6195d951e23b 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -601,8 +601,8 @@ void TabletMeta::remove_delete_predicate_by_version(const Version& version) { for (const auto& it : temp.sub_predicates()) { del_cond_str += it + ";"; } - LOG(INFO) << "remove one del_pred. version=" << temp.version() - << ", condition=" << del_cond_str; + VLOG_NOTICE << "remove one del_pred. version=" << temp.version() + << ", condition=" << del_cond_str; // remove delete condition from PB _del_pred_array.SwapElements(ordinal, _del_pred_array.size() - 1); diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 139740b9c3786e..eb223d661d5d7e 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -254,7 +254,10 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s // row_tuples _row_desc.to_protobuf(output_batch->mutable_row_tuples()); // tuple_offsets: must clear before reserve + // TODO(cmy): the tuple_offsets should be removed after v1.1.0, use new_tuple_offsets instead. + // keep tuple_offsets here is just for compatibility. output_batch->clear_tuple_offsets(); + output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row); output_batch->clear_new_tuple_offsets(); output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row); // is_compressed @@ -279,7 +282,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s int64_t offset = 0; // current offset into output_batch->tuple_data char* tuple_data = mutable_tuple_data->data(); const auto& tuple_descs = _row_desc.tuple_descriptors(); - const auto& mutable_tuple_offsets = output_batch->mutable_new_tuple_offsets(); + const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets(); + const auto& mutable_new_tuple_offsets = output_batch->mutable_new_tuple_offsets(); for (int i = 0; i < _num_rows; ++i) { TupleRow* row = get_row(i); @@ -288,10 +292,12 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, s if (row->get_tuple(j) == nullptr) { // NULLs are encoded as -1 mutable_tuple_offsets->Add(-1); + mutable_new_tuple_offsets->Add(-1); continue; } // Record offset before creating copy (which increments offset and tuple_data) - mutable_tuple_offsets->Add(offset); + mutable_tuple_offsets->Add((int32_t) offset); + mutable_new_tuple_offsets->Add(offset); row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true); CHECK_LE(offset, size); } @@ -519,7 +525,9 @@ vectorized::Block RowBatch::convert_to_vec_block() const { size_t RowBatch::get_batch_size(const PRowBatch& batch) { size_t result = batch.tuple_data().size(); result += batch.row_tuples().size() * sizeof(int32_t); + // TODO(cmy): remove batch.tuple_offsets result += batch.tuple_offsets().size() * sizeof(int32_t); + result += batch.new_tuple_offsets().size() * sizeof(int64_t); return result; } diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 33eaf223847f07..0d679dcb61fd37 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -2096,7 +2096,7 @@ When there are a large number of replicas waiting to be balanced or repaired in ### repair_slow_replica -Default: true +Default: false IsMutable:true @@ -2153,4 +2153,4 @@ Dynamically configured: true Only for Master FE: true -The data size threshold used to judge whether replica is too large \ No newline at end of file +The data size threshold used to judge whether replica is too large diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 6e73f402530da7..0e3c5191909a44 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -2116,7 +2116,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 ### repair_slow_replica -默认值:true +默认值:false 是否可以动态配置:true diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 21a84308804b1a..b25b5f35b916c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1572,7 +1572,7 @@ public class Config extends ConfigBase { * auto set the slowest compaction replica's status to bad */ @ConfField(mutable = true, masterOnly = true) - public static boolean repair_slow_replica = true; + public static boolean repair_slow_replica = false; /* * The relocation of a colocation group may involve a large number of tablets moving within the cluster. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index f97d25e9b21af9..ae6d8b3c6d4a02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -123,14 +123,14 @@ public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) throws UserException { // check load auth if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - createRoutineLoadStmt.getDBName(), - createRoutineLoadStmt.getTableName(), - PrivPredicate.LOAD)) { + createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getTableName(), + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - createRoutineLoadStmt.getDBName(), - createRoutineLoadStmt.getDBName() + ": " + createRoutineLoadStmt.getTableName()); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getDBName() + ": " + createRoutineLoadStmt.getTableName()); } RoutineLoadJob routineLoadJob = null; @@ -220,13 +220,13 @@ public RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName) throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - dbFullName, - tableName, - PrivPredicate.LOAD)) { + dbFullName, + tableName, + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - dbFullName + ": " + tableName); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + dbFullName + ": " + tableName); } return routineLoadJob; } @@ -336,10 +336,10 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"), false /* not replay */); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("current_state", routineLoadJob.getState()) - .add("user", ConnectContext.get().getQualifiedUser()) - .add("msg", "routine load job has been stopped by user") - .build()); + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been stopped by user") + .build()); } public int getSizeOfIdToRoutineLoadTask() { @@ -394,7 +394,7 @@ public long getMinTaskBeId(String clusterName) throws LoadException { } if (LOG.isDebugEnabled()) { LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum, - beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId)); + beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId)); } result = maxIdleSlotNum < idleTaskNum ? beId : result; maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); @@ -426,7 +426,7 @@ public long getAvailableBeForTask(long previoudBeId, String clusterName) throws // 1. Find if the given BE id has available slots if (previoudBeId != -1L) { int idleTaskNum = 0; - if (beIdToMaxConcurrentTasks.containsKey(previoudBeId)) { + if (!beIdToMaxConcurrentTasks.containsKey(previoudBeId)) { idleTaskNum = 0; } else if (beIdToConcurrentTasks.containsKey(previoudBeId)) { idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId); @@ -594,7 +594,7 @@ public void cleanOldRoutineLoadJobs() { iterator.remove(); RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(), - routineLoadJob.getState()); + routineLoadJob.getState()); Catalog.getCurrentCatalog().getEditLog().logRemoveRoutineLoadJob(operation); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) .add("end_timestamp", routineLoadJob.getEndTimestamp()) @@ -643,8 +643,8 @@ public void updateRoutineLoadJob() throws UserException { public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { unprotectedAddJob(routineLoadJob); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("msg", "replay create routine load job") - .build()); + .add("msg", "replay create routine load job") + .build()); } public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { @@ -655,11 +655,11 @@ public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { LOG.error("should not happened", e); } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId()) - .add("current_state", operation.getJobState()) - .add("msg", "replay change routine load job") - .build()); + .add("current_state", operation.getJobState()) + .add("msg", "replay change routine load job") + .build()); } - + /** * Enter of altering a routine load job */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 449aa7fd63c4da..e3c9feabad9c64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -184,7 +184,7 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); - LOG.info("ReportExecStatus() runtime error, query {} does not exist", params.query_id); + LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id)); return result; } try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index 864d8fb8bfa901..875ffe1ab7a3a6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -20,14 +20,11 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; @@ -52,8 +49,6 @@ import java.util.Random; import java.util.UUID; -import mockit.Mocked; - public class TabletReplicaTooSlowTest { private static final Logger LOG = LogManager.getLogger(TabletReplicaTooSlowTest.class); // use a unique dir so that it won't be conflict with other unit test which @@ -78,6 +73,7 @@ public static void beforeClass() throws Exception { FeConstants.runningUnitTest = true; FeConstants.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; + Config.repair_slow_replica = true; // 5 backends: // 127.0.0.1 // 127.0.0.2