Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,19 @@ protected void unprotectUpdateProgress() throws UserException {
}

@Override
protected void preCheckNeedSchedule() throws UserException {
protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException {
// If user does not specify kafka partition,
// We will fetch partition from kafka server periodically
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) {
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE || needAutoResume) {
if (customKafkaPartitions != null && !customKafkaPartitions.isEmpty()) {
return;
return true;
}
updateKafkaPartitions();
return updateKafkaPartitions();
}
return true;
}

private void updateKafkaPartitions() throws UserException {
private boolean updateKafkaPartitions() throws UserException {
try {
this.newCurrentKafkaPartition = getAllKafkaPartitions();
} catch (Exception e) {
Expand All @@ -401,7 +402,9 @@ private void updateKafkaPartitions() throws UserException {
new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
false /* not replay */);
}
return false;
}
return true;
}

// if customKafkaPartition is not null, then return false immediately
Expand All @@ -413,33 +416,20 @@ private void updateKafkaPartitions() throws UserException {
protected boolean unprotectNeedReschedule() throws UserException {
// only running and need_schedule job need to be changed current kafka partitions
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) {
if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
currentKafkaPartitions = customKafkaPartitions;
return false;
}
// the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler()
Preconditions.checkNotNull(this.newCurrentKafkaPartition);
if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) {
if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) {
currentKafkaPartitions = this.newCurrentKafkaPartition;
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions))
.add("msg", "current kafka partitions has been change")
.build());
}
return true;
} else {
// if the partitions of currentKafkaPartitions and progress are inconsistent,
// We should also update the progress
for (Integer kafkaPartition : currentKafkaPartitions) {
if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) {
return true;
}
}
return false;
}
} else {
return isKafkaPartitionsChanged();
}
return false;
}

private boolean isKafkaPartitionsChanged() throws UserException {
if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
currentKafkaPartitions = customKafkaPartitions;
return false;
}
// the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler()
Preconditions.checkNotNull(this.newCurrentKafkaPartition);
if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) {
if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) {
currentKafkaPartitions = this.newCurrentKafkaPartition;
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
Expand All @@ -448,14 +438,39 @@ protected boolean unprotectNeedReschedule() throws UserException {
.build());
}
return true;
} else {
// if the partitions of currentKafkaPartitions and progress are inconsistent,
// We should also update the progress
for (Integer kafkaPartition : currentKafkaPartitions) {
if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) {
return true;
}
}
return false;
}

}
if (this.state == JobState.PAUSED) {
return ScheduleRule.isNeedAutoSchedule(this);
} else {
currentKafkaPartitions = this.newCurrentKafkaPartition;
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions))
.add("msg", "current kafka partitions has been change")
.build());
}
return true;
}
return false;
}

@Override
protected boolean needAutoResume() {
writeLock();
try {
if (this.state == JobState.PAUSED) {
return ScheduleRule.isNeedAutoSchedule(this);
}
return false;
} finally {
writeUnlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1521,11 +1521,15 @@ public void update() throws UserException {
}
}

preCheckNeedSchedule();
boolean needAutoResume = needAutoResume();

if (!refreshKafkaPartitions(needAutoResume)) {
return;
}

writeLock();
try {
if (unprotectNeedReschedule()) {
if (unprotectNeedReschedule() || needAutoResume) {
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("msg", "Job need to be rescheduled")
.build());
Expand All @@ -1542,8 +1546,8 @@ public void update() throws UserException {
// Because unprotectUpdateProgress() is protected by writelock.
// So if there are time-consuming operations, they should be done in this method.
// (Such as getAllKafkaPartitions() in KafkaRoutineLoad)
protected void preCheckNeedSchedule() throws UserException {

protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException {
return false;
}

protected void unprotectUpdateProgress() throws UserException {
Expand All @@ -1553,6 +1557,10 @@ protected boolean unprotectNeedReschedule() throws UserException {
return false;
}

protected boolean needAutoResume() {
return false;
}

public void setOrigStmt(OriginStatement origStmt) {
this.origStmt = origStmt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,7 @@ public void testPauseRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutin
throw new UserException("thread sleep failed");
}
routineLoadManager.updateRoutineLoadJob();
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState());
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
}
routineLoadManager.updateRoutineLoadJob();
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
Expand Down
Loading