From ac65fd745de72388b1d4db0db57c813580b310c4 Mon Sep 17 00:00:00 2001 From: laihui Date: Wed, 12 Feb 2025 15:59:03 +0800 Subject: [PATCH] ensure abnormal jobs do not interfere with normal task scheduling --- .../load/routineload/KafkaRoutineLoadJob.java | 89 +++++++++++-------- .../load/routineload/RoutineLoadJob.java | 16 +++- .../routineload/RoutineLoadManagerTest.java | 3 +- 3 files changed, 65 insertions(+), 43 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index d0843eb92044f0..dcc10a06a8d3dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -371,18 +371,19 @@ protected void unprotectUpdateProgress() throws UserException { } @Override - protected void preCheckNeedSchedule() throws UserException { + protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException { // If user does not specify kafka partition, // We will fetch partition from kafka server periodically - if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE || needAutoResume) { if (customKafkaPartitions != null && !customKafkaPartitions.isEmpty()) { - return; + return true; } - updateKafkaPartitions(); + return updateKafkaPartitions(); } + return true; } - private void updateKafkaPartitions() throws UserException { + private boolean updateKafkaPartitions() throws UserException { try { this.newCurrentKafkaPartition = getAllKafkaPartitions(); } catch (Exception e) { @@ -397,7 +398,9 @@ private void updateKafkaPartitions() throws UserException { new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg), false /* not replay */); } + return false; } + return true; } // if customKafkaPartition is not null, then return false immediately @@ -409,33 +412,20 @@ private void updateKafkaPartitions() throws UserException { protected boolean unprotectNeedReschedule() throws UserException { // only running and need_schedule job need to be changed current kafka partitions if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { - if (CollectionUtils.isNotEmpty(customKafkaPartitions)) { - currentKafkaPartitions = customKafkaPartitions; - return false; - } - // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() - Preconditions.checkNotNull(this.newCurrentKafkaPartition); - if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) { - if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { - currentKafkaPartitions = this.newCurrentKafkaPartition; - if (LOG.isDebugEnabled()) { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) - .add("msg", "current kafka partitions has been change") - .build()); - } - return true; - } else { - // if the partitions of currentKafkaPartitions and progress are inconsistent, - // We should also update the progress - for (Integer kafkaPartition : currentKafkaPartitions) { - if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { - return true; - } - } - return false; - } - } else { + return isKafkaPartitionsChanged(); + } + return false; + } + + private boolean isKafkaPartitionsChanged() throws UserException { + if (CollectionUtils.isNotEmpty(customKafkaPartitions)) { + currentKafkaPartitions = customKafkaPartitions; + return false; + } + // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() + Preconditions.checkNotNull(this.newCurrentKafkaPartition); + if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) { + if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { currentKafkaPartitions = this.newCurrentKafkaPartition; if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -444,14 +434,39 @@ protected boolean unprotectNeedReschedule() throws UserException { .build()); } return true; + } else { + // if the partitions of currentKafkaPartitions and progress are inconsistent, + // We should also update the progress + for (Integer kafkaPartition : currentKafkaPartitions) { + if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { + return true; + } + } + return false; } - - } - if (this.state == JobState.PAUSED) { - return ScheduleRule.isNeedAutoSchedule(this); + } else { + currentKafkaPartitions = this.newCurrentKafkaPartition; + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); + } + return true; } - return false; + } + @Override + protected boolean needAutoResume() { + writeLock(); + try { + if (this.state == JobState.PAUSED) { + return ScheduleRule.isNeedAutoSchedule(this); + } + return false; + } finally { + writeUnlock(); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 1ee4fbee123459..b4ba4bcc48de4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1510,11 +1510,15 @@ public void update() throws UserException { } } - preCheckNeedSchedule(); + boolean needAutoResume = needAutoResume(); + + if (!refreshKafkaPartitions(needAutoResume)) { + return; + } writeLock(); try { - if (unprotectNeedReschedule()) { + if (unprotectNeedReschedule() || needAutoResume) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("msg", "Job need to be rescheduled") .build()); @@ -1530,8 +1534,8 @@ public void update() throws UserException { // Because unprotectUpdateProgress() is protected by writelock. // So if there are time-consuming operations, they should be done in this method. // (Such as getAllKafkaPartitions() in KafkaRoutineLoad) - protected void preCheckNeedSchedule() throws UserException { - + protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException { + return false; } protected void unprotectUpdateProgress() throws UserException { @@ -1541,6 +1545,10 @@ protected boolean unprotectNeedReschedule() throws UserException { return false; } + protected boolean needAutoResume() { + return false; + } + public void setOrigStmt(OriginStatement origStmt) { this.origStmt = origStmt; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 01449d887e5345..2c238fa397bce7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -647,8 +647,7 @@ public void testPauseRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutin throw new UserException("thread sleep failed"); } routineLoadManager.updateRoutineLoadJob(); - Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); - Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); + Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } routineLoadManager.updateRoutineLoadJob(); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());