Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ under the License.
# PAUSE ROUTINE LOAD
## example

1. Suspend the routine import operation named test 1.
1. Pause routine load named test1;

PAUSE ROUTINE LOAD FOR test1;
PAUSE ROUTINE LOAD FOR test1;

2. Pause all running routine load;

PAUSE ALL ROUTINE LOAD;

## keyword
PAUSE,ROUTINE,LOAD

PAUSE,ALL,ROUTINE,LOAD
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ under the License.
# RESUME ROUTINE LOAD
## example

1. Restore the routine import job named test 1.
1. Resume routine load job named test1.

RESUME ROUTINE LOAD FOR test1;
RESUME ROUTINE LOAD FOR test1;

2. Resume all paused routine load job.

RESUME ALL ROUTINE LOAD;

## keyword
RESUME,ROUTINE,LOAD

RESUME,ALL,ROUTINE,LOAD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ under the License.

PAUSE ROUTINE LOAD FOR test1;

2. 暂停所有正在运行的例行导入作业

PAUSE ALL ROUTINE LOAD;

## keyword
PAUSE,ROUTINE,LOAD
PAUSE,ALL,ROUTINE,LOAD

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ under the License.

RESUME ROUTINE LOAD FOR test1;

2. 恢复所有暂停中的例行导入作业。

RESUME ALL ROUTINE LOAD;

## keyword
RESUME,ROUTINE,LOAD

RESUME,ALL,ROUTINE,LOAD

8 changes: 8 additions & 0 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -1666,13 +1666,21 @@ pause_routine_load_stmt ::=
{:
RESULT = new PauseRoutineLoadStmt(jobLabel);
:}
| KW_PAUSE KW_ALL KW_ROUTINE KW_LOAD
{:
RESULT = new PauseRoutineLoadStmt(null);
:}
;

resume_routine_load_stmt ::=
KW_RESUME KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel
{:
RESULT = new ResumeRoutineLoadStmt(jobLabel);
:}
| KW_RESUME KW_ALL KW_ROUTINE KW_LOAD
{:
RESULT = new ResumeRoutineLoadStmt(null);
:}
;

stop_routine_load_stmt ::=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.doris.analysis;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;

import com.google.common.base.Strings;

/*
Pause routine load by name

Expand All @@ -29,22 +33,35 @@
public class PauseRoutineLoadStmt extends DdlStmt {

private final LabelName labelName;
private String db;

public PauseRoutineLoadStmt(LabelName labelName) {
this.labelName = labelName;
}

public boolean isAll() {
return labelName == null;
}

public String getName() {
return labelName.getLabelName();
}

public String getDbFullName(){
return labelName.getDbName();
return db;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
labelName.analyze(analyzer);
if (labelName != null) {
labelName.analyze(analyzer);
db = labelName.getDbName();
} else {
if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.doris.analysis;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;

import com.google.common.base.Strings;

/*
Resume routine load job by name

Expand All @@ -29,22 +33,35 @@
public class ResumeRoutineLoadStmt extends DdlStmt{

private final LabelName labelName;
private String db;

public ResumeRoutineLoadStmt(LabelName labelName) {
this.labelName = labelName;
}

public boolean isAll() {
return labelName == null;
}

public String getName() {
return labelName.getLabelName();
}

public String getDbFullName() {
return labelName.getDbName();
return db;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
labelName.analyze(analyzer);
if (labelName != null) {
labelName.analyze(analyzer);
db = labelName.getDbName();
} else {
if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,34 +232,93 @@ public RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName)
return routineLoadJob;
}

// get all jobs which state is not in final state from specified database
public List<RoutineLoadJob> checkPrivAndGetAllJobs(String dbName)
throws MetaNotFoundException, DdlException, AnalysisException {

List<RoutineLoadJob> result = Lists.newArrayList();
Database database = Catalog.getCurrentCatalog().getDb(dbName);
if (database == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
long dbId = database.getId();
Map<String, List<RoutineLoadJob>> jobMap = dbToNameToRoutineLoadJob.get(dbId);
if (jobMap == null) {
// return empty result
return result;
}

for (List<RoutineLoadJob> jobs : jobMap.values()) {
for (RoutineLoadJob job : jobs) {
if (!job.getState().isFinalState()) {
String tableName = job.getTableName();
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
dbName, tableName, PrivPredicate.LOAD)) {
continue;
}
result.add(job);
}
}
}

return result;
}

public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
throws UserException {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
pauseRoutineLoadStmt.getName());
List<RoutineLoadJob> jobs = Lists.newArrayList();
if (pauseRoutineLoadStmt.isAll()) {
jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
pauseRoutineLoadStmt.getName());
jobs.add(routineLoadJob);
}

routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
for (RoutineLoadJob routineLoadJob : jobs) {
try {
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
"routine load job has been paused by user").build());
} catch (UserException e) {
LOG.warn("failed to pause routine load job {}", routineLoadJob.getName(), e);
continue;
}
}
}

public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
resumeRoutineLoadStmt.getName());

routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
routineLoadJob.firstResumeTimestamp = 0;
routineLoadJob.autoResumeLock = false;
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been resumed by user")
.build());

List<RoutineLoadJob> jobs = Lists.newArrayList();
if (resumeRoutineLoadStmt.isAll()) {
jobs = checkPrivAndGetAllJobs(resumeRoutineLoadStmt.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
resumeRoutineLoadStmt.getName());
jobs.add(routineLoadJob);
}

for (RoutineLoadJob routineLoadJob : jobs) {
try {
routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
routineLoadJob.firstResumeTimestamp = 0;
routineLoadJob.autoResumeLock = false;
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been resumed by user")
.build());
} catch (UserException e) {
LOG.warn("failed to resume routine load job {}", routineLoadJob.getName(), e);
continue;
}
}
}

public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,4 +913,68 @@ public void testAlterRoutineLoadJob(@Injectable StopRoutineLoadStmt stopRoutineL

Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState());
}

@Test
public void testPauseAndResumeAllRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutineLoadStmt,
@Injectable ResumeRoutineLoadStmt resumeRoutineLoadStmt,
@Mocked Catalog catalog,
@Mocked Database database,
@Mocked PaloAuth paloAuth,
@Mocked ConnectContext connectContext) throws UserException {
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap();
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap();

List<RoutineLoadJob> routineLoadJobList1 = Lists.newArrayList();
RoutineLoadJob routineLoadJob1 = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob1, "id", 1000L);
routineLoadJobList1.add(routineLoadJob1);

List<RoutineLoadJob> routineLoadJobList2 = Lists.newArrayList();
RoutineLoadJob routineLoadJob2 = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob2, "id", 1002L);
routineLoadJobList2.add(routineLoadJob2);

nameToRoutineLoadJob.put("job1", routineLoadJobList1);
nameToRoutineLoadJob.put("job2", routineLoadJobList2);
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);

Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());

new Expectations() {
{
pauseRoutineLoadStmt.isAll();
minTimes = 0;
result = true;
pauseRoutineLoadStmt.getDbFullName();
minTimes = 0;
result = "";
catalog.getDb("");
minTimes = 0;
result = database;
database.getId();
minTimes = 0;
result = 1L;
catalog.getAuth();
minTimes = 0;
result = paloAuth;
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
minTimes = 0;
result = true;
resumeRoutineLoadStmt.isAll();
minTimes = 0;
result = true;
}
};

routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob1.getState());
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob2.getState());

routineLoadManager.resumeRoutineLoadJob(resumeRoutineLoadStmt);
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob2.getState());
}
}