From 8dc85084cd248e5a402e06a7ed926bed9ceb50c2 Mon Sep 17 00:00:00 2001 From: hui lai Date: Thu, 13 Feb 2025 10:12:07 +0800 Subject: [PATCH] [improve](routine load) ensure abnormal jobs do not interfere with normal task scheduling (#47530) When a routine load job is created and successfully acquires all partition information, but encounters persistent Kafka IP timeout issues (e.g., due to network routing configuration changes), it may impact the scheduling of all routine load tasks. --- .../load/routineload/KafkaRoutineLoadJob.java | 89 +++++++++++-------- .../load/routineload/RoutineLoadJob.java | 16 +++- .../routineload/RoutineLoadManagerTest.java | 3 +- 3 files changed, 65 insertions(+), 43 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 0376cd3f366c3c..6952a32b8b3c9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -375,18 +375,19 @@ protected void unprotectUpdateProgress() throws UserException { } @Override - protected void preCheckNeedSchedule() throws UserException { + protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException { // If user does not specify kafka partition, // We will fetch partition from kafka server periodically - if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE || needAutoResume) { if (customKafkaPartitions != null && !customKafkaPartitions.isEmpty()) { - return; + return true; } - updateKafkaPartitions(); + return updateKafkaPartitions(); } + return true; } - private void updateKafkaPartitions() throws UserException { + private boolean updateKafkaPartitions() throws UserException { try { this.newCurrentKafkaPartition = getAllKafkaPartitions(); } catch (Exception e) { @@ -401,7 +402,9 @@ private void updateKafkaPartitions() throws UserException { new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg), false /* not replay */); } + return false; } + return true; } // if customKafkaPartition is not null, then return false immediately @@ -413,33 +416,20 @@ private void updateKafkaPartitions() throws UserException { protected boolean unprotectNeedReschedule() throws UserException { // only running and need_schedule job need to be changed current kafka partitions if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { - if (CollectionUtils.isNotEmpty(customKafkaPartitions)) { - currentKafkaPartitions = customKafkaPartitions; - return false; - } - // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() - Preconditions.checkNotNull(this.newCurrentKafkaPartition); - if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) { - if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { - currentKafkaPartitions = this.newCurrentKafkaPartition; - if (LOG.isDebugEnabled()) { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) - .add("msg", "current kafka partitions has been change") - .build()); - } - return true; - } else { - // if the partitions of currentKafkaPartitions and progress are inconsistent, - // We should also update the progress - for (Integer kafkaPartition : currentKafkaPartitions) { - if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { - return true; - } - } - return false; - } - } else { + return isKafkaPartitionsChanged(); + } + return false; + } + + private boolean isKafkaPartitionsChanged() throws UserException { + if (CollectionUtils.isNotEmpty(customKafkaPartitions)) { + currentKafkaPartitions = customKafkaPartitions; + return false; + } + // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() + Preconditions.checkNotNull(this.newCurrentKafkaPartition); + if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) { + if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { currentKafkaPartitions = this.newCurrentKafkaPartition; if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -448,14 +438,39 @@ protected boolean unprotectNeedReschedule() throws UserException { .build()); } return true; + } else { + // if the partitions of currentKafkaPartitions and progress are inconsistent, + // We should also update the progress + for (Integer kafkaPartition : currentKafkaPartitions) { + if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { + return true; + } + } + return false; } - - } - if (this.state == JobState.PAUSED) { - return ScheduleRule.isNeedAutoSchedule(this); + } else { + currentKafkaPartitions = this.newCurrentKafkaPartition; + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); + } + return true; } - return false; + } + @Override + protected boolean needAutoResume() { + writeLock(); + try { + if (this.state == JobState.PAUSED) { + return ScheduleRule.isNeedAutoSchedule(this); + } + return false; + } finally { + writeUnlock(); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 8de1e5551400f6..24f1252665236f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1521,11 +1521,15 @@ public void update() throws UserException { } } - preCheckNeedSchedule(); + boolean needAutoResume = needAutoResume(); + + if (!refreshKafkaPartitions(needAutoResume)) { + return; + } writeLock(); try { - if (unprotectNeedReschedule()) { + if (unprotectNeedReschedule() || needAutoResume) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("msg", "Job need to be rescheduled") .build()); @@ -1542,8 +1546,8 @@ public void update() throws UserException { // Because unprotectUpdateProgress() is protected by writelock. // So if there are time-consuming operations, they should be done in this method. // (Such as getAllKafkaPartitions() in KafkaRoutineLoad) - protected void preCheckNeedSchedule() throws UserException { - + protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException { + return false; } protected void unprotectUpdateProgress() throws UserException { @@ -1553,6 +1557,10 @@ protected boolean unprotectNeedReschedule() throws UserException { return false; } + protected boolean needAutoResume() { + return false; + } + public void setOrigStmt(OriginStatement origStmt) { this.origStmt = origStmt; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 01449d887e5345..2c238fa397bce7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -647,8 +647,7 @@ public void testPauseRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutin throw new UserException("thread sleep failed"); } routineLoadManager.updateRoutineLoadJob(); - Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); - Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); + Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } routineLoadManager.updateRoutineLoadJob(); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());