diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 4332ce5c1db35d..3be2c09e8a8d91 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -68,6 +68,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { [this, &result_st] (const Status& st) { std::unique_lock lock(_mutex); _counter--; + VLOG(1) << "group counter is: " << _counter << ", grp: " << _grp_id; if (_counter == 0) { _queue.shutdown(); LOG(INFO) << "all consumers are finished. shutdown queue. group id: " << _grp_id; @@ -117,7 +118,10 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { // shutdown queue _queue.shutdown(); // cancel all consumers - for(auto& consumer : _consumers) { consumer->cancel(ctx); } + for (auto& consumer : _consumers) { consumer->cancel(ctx); } + + // waiting all threads finished + _thread_pool.join(); if (!result_st.ok()) { // some of consumers encounter errors, cancel this task @@ -158,7 +162,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { << " - " << msg->offset() << "]"; } else { // failed to append this msg, we must stop - LOG(WARNING) << "failed to append msg to pipe"; + LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; eos = true; } delete msg; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index ec0afa4c55d3be..333768bf1c1180 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -41,6 +41,11 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { return Status::OK; } + if (_thread_pool.get_queue_size() > 100) { + LOG(INFO) << "too many tasks in queue: " << _thread_pool.get_queue_size() << ", reject task: " << UniqueId(task.id); + return Status("too many tasks"); + } + // create the context StreamLoadContext* ctx = new StreamLoadContext(_exec_env); ctx->load_type = TLoadType::ROUTINE_LOAD; @@ -131,7 +136,7 @@ void RoutineLoadTaskExecutor::exec_task( } \ } while (false); - VLOG(1) << "begin to execute routine load task: " << ctx->brief(); + LOG(INFO) << "begin to execute routine load task: " << ctx->brief(); // create data consumer group std::shared_ptr consumer_grp; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 604a83e17e3cc0..63511f25600a64 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -42,7 +42,7 @@ class RoutineLoadTaskExecutor { RoutineLoadTaskExecutor(ExecEnv* exec_env): _exec_env(exec_env), - _thread_pool(10, 1000), + _thread_pool(10, 100), _data_consumer_pool(10) { _data_consumer_pool.start_bg_worker(); diff --git a/fe/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/src/main/java/org/apache/doris/catalog/Tablet.java index 4d41aed9341482..f0de871b5b33f7 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/src/main/java/org/apache/doris/catalog/Tablet.java @@ -161,10 +161,18 @@ public Set getBackendIds() { return beIds; } - public List getAliveBackendIdsList() { + // for loading data + public List getNormalReplicaBackendIds() { List beIds = Lists.newArrayList(); + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); for (Replica replica : replicas) { - if (Catalog.getCurrentSystemInfo().checkBackendAlive(replica.getBackendId())) { + if (replica.isBad()) { + continue; + } + + ReplicaState state = replica.getState(); + if (infoService.checkBackendAlive(replica.getBackendId()) + && (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE)) { beIds.add(replica.getBackendId()); } } diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index 0c9fb3adbb3279..0325f5792f6ed2 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -286,9 +286,9 @@ private TOlapTableLocationParam createLocation(OlapTable table) throws UserExcep int quorum = table.getPartitionInfo().getReplicationNum(partition.getId()) / 2 + 1; for (MaterializedIndex index : partition.getMaterializedIndices()) { // we should ensure the replica backend is alive - //otherwise, there will be a 'unknown node id, id=xxx' error for stream load + // otherwise, there will be a 'unknown node id, id=xxx' error for stream load for (Tablet tablet : index.getTablets()) { - List beIds = tablet.getAliveBackendIdsList(); + List beIds = tablet.getNormalReplicaBackendIds(); if (beIds.size() < quorum) { throw new UserException("tablet " + tablet.getId() + " has few replicas: " + beIds.size()); }