Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,23 @@ public List<RoutineLoadJob> checkPrivAndGetAllJobs(String dbName)
public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
throws UserException {
List<RoutineLoadJob> jobs = Lists.newArrayList();
if (pauseRoutineLoadStmt.isAll()) {
jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
pauseRoutineLoadStmt.getName());
jobs.add(routineLoadJob);
// it needs lock when getting routine load job,
// otherwise, it may cause the editLog out of order in the following scenarios:
// thread A: create job and record job meta
// thread B: change job state and persist in editlog according to meta
// thread A: persist in editlog
// which will cause the null pointer exception when replaying editLog
readLock();
try {
if (pauseRoutineLoadStmt.isAll()) {
jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
pauseRoutineLoadStmt.getName());
jobs.add(routineLoadJob);
}
} finally {
readUnlock();
}

for (RoutineLoadJob routineLoadJob : jobs) {
Expand Down Expand Up @@ -373,8 +384,20 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th

public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
throws UserException {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
stopRoutineLoadStmt.getName());
RoutineLoadJob routineLoadJob;
// it needs lock when getting routine load job,
// otherwise, it may cause the editLog out of order in the following scenarios:
// thread A: create job and record job meta
// thread B: change job state and persist in editlog according to meta
// thread A: persist in editlog
// which will cause the null pointer exception when replaying editLog
readLock();
try {
routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
stopRoutineLoadStmt.getName());
} finally {
readUnlock();
}
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED,
new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"),
Expand Down Expand Up @@ -796,6 +819,9 @@ public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) {
job.updateState(operation.getJobState(), null, true /* is replay */);
} catch (UserException e) {
LOG.error("should not happened", e);
} catch (NullPointerException npe) {
LOG.error("cannot get job when replaying state change job, which is unexpected, job id: "
+ operation.getId());
}
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId())
.add("current_state", operation.getJobState())
Expand All @@ -807,7 +833,19 @@ public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) {
* Enter of altering a routine load job
*/
public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException {
RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
RoutineLoadJob job;
// it needs lock when getting routine load job,
// otherwise, it may cause the editLog out of order in the following scenarios:
// thread A: create job and record job meta
// thread B: change job state and persist in editlog according to meta
// thread A: persist in editlog
// which will cause the null pointer exception when replaying editLog
readLock();
try {
job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
} finally {
readUnlock();
}
if (stmt.hasDataSourceProperty()
&& !stmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(job.dataSourceType.name())) {
throw new DdlException("The specified job type is not: "
Expand Down