Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@

import com.google.common.base.Strings;

// CANCEL LOAD statement used to cancel load job.
//
// syntax:
// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
public class CancelLoadStmt extends DdlStmt {

private String dbName;
private String label;

private Expr whereClause;
private boolean isAccurateMatch;

public String getDbName() {
return dbName;
Expand All @@ -42,6 +47,11 @@ public String getLabel() {
public CancelLoadStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
this.isAccurateMatch = false;
}

public boolean isAccurateMatch() {
return isAccurateMatch;
}

@Override
Expand All @@ -68,10 +78,17 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {

if (whereClause instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause;
isAccurateMatch = true;
if (binaryPredicate.getOp() != Operator.EQ) {
valid = false;
break;
}
} else if (whereClause instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) whereClause;
if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
valid = false;
break;
}
} else {
valid = false;
break;
Expand Down Expand Up @@ -101,7 +118,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
} while (false);

if (!valid) {
throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"");
throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," +
" or LABEL LIKE \"matcher\"");
}
}

Expand All @@ -119,4 +137,9 @@ public String toSql() {
return stringBuilder.toString();
}

@Override
public String toString() {
return toSql();
}

}
114 changes: 106 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -133,6 +132,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

public class Load {
private static final Logger LOG = LogManager.getLogger(Load.class);
Expand Down Expand Up @@ -1545,7 +1545,7 @@ private boolean unprotectIsLabelUsed(long dbId, String label, long timestamp, bo
return false;
}

public boolean isLabelExist(String dbName, String label) throws DdlException {
public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException {
// get load job and check state
Database db = Catalog.getCurrentCatalog().getDb(dbName);
if (db == null) {
Expand All @@ -1557,8 +1557,19 @@ public boolean isLabelExist(String dbName, String label) throws DdlException {
if (labelToLoadJobs == null) {
return false;
}
List<LoadJob> loadJobs = labelToLoadJobs.get(label);
if (loadJobs == null) {
List<LoadJob> loadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(labelValue)) {
loadJobs.addAll(labelToLoadJobs.get(labelValue));
}
} else {
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (entry.getKey().contains(labelValue)) {
loadJobs.addAll(entry.getValue());
}
}
}
if (loadJobs.isEmpty()) {
return false;
}
if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) {
Expand All @@ -1570,6 +1581,93 @@ public boolean isLabelExist(String dbName, String label) throws DdlException {
}
}

public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException {
// get params
String dbName = stmt.getDbName();
String label = stmt.getLabel();

// get load job and check state
Database db = Catalog.getCurrentCatalog().getDb(dbName);
if (db == null) {
throw new DdlException("Db does not exist. name: " + dbName);
}
// List of load jobs waiting to be cancelled
List<LoadJob> loadJobs = Lists.newArrayList();
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}

// get jobs by label
List<LoadJob> matchLoadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(label)) {
matchLoadJobs.addAll(labelToLoadJobs.get(label));
}
} else {
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (entry.getKey().contains(label)) {
matchLoadJobs.addAll(entry.getValue());
}
}
}

if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}

// check state here
List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(job -> {
JobState state = job.getState();
return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED;
}).collect(Collectors.toList());
if (uncompletedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job which label " +
(isAccurateMatch ? "is " : "like ") + stmt.getLabel());
}
loadJobs.addAll(uncompletedLoadJob);
} finally {
readUnlock();
}

// check auth here, cause we need table info
Set<String> tableNames = Sets.newHashSet();
for (LoadJob loadJob : loadJobs) {
tableNames.addAll(loadJob.getTableNames());
}

if (tableNames.isEmpty()) {
if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), tblName);
}
}
}

// cancel job
for (LoadJob loadJob : loadJobs) {
List<String> failedMsg = Lists.newArrayList();
boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg);
if (!ok) {
throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
"label=[" + loadJob.getLabel() + "] failed msg=" +
(failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
}
}

return true;
}

public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
// get params
String dbName = stmt.getDbName();
Expand Down Expand Up @@ -1609,16 +1707,16 @@ public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
if (tableNames.isEmpty()) {
// forward compatibility
if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.LOAD)) {
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), tblName);
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), tblName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,62 @@ public void recordFinishedLoadJob(String label, String dbName, long tableId, Etl
Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
}

public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException {
Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName());
if (db == null) {
throw new DdlException("Db does not exist. name: " + stmt.getDbName());
}

// List of load jobs waiting to be cancelled
List<LoadJob> loadJobs = Lists.newArrayList();
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}

// get jobs by label
List<LoadJob> matchLoadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(stmt.getLabel())) {
matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel()));
}
} else {
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (entry.getKey().contains(stmt.getLabel())) {
matchLoadJobs.addAll(entry.getValue());
}
}
}

if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}

// check state here
List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(entity -> !entity.isTxnDone())
.collect(Collectors.toList());
if (uncompletedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job which label " +
(isAccurateMatch ? "is " : "like ") + stmt.getLabel());
}

loadJobs.addAll(uncompletedLoadJob);
} finally {
readUnlock();
}

for (LoadJob loadJob : loadJobs) {
try {
loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
} catch (DdlException e) {
throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
"label=[" + loadJob.getLabel() + "] failed msg=" + e.getMessage());
}
}
}

public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName());
if (db == null) {
Expand Down
16 changes: 11 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,17 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception {
catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
}
} else if (ddlStmt instanceof CancelLoadStmt) {
if (catalog.getLoadInstance().isLabelExist(
((CancelLoadStmt) ddlStmt).getDbName(), ((CancelLoadStmt) ddlStmt).getLabel())) {
catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt);
} else {
catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
boolean isAccurateMatch = ((CancelLoadStmt) ddlStmt).isAccurateMatch();
boolean isLabelExist = catalog.getLoadInstance().isLabelExist(
((CancelLoadStmt) ddlStmt).getDbName(),
((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch);
if (isLabelExist) {
catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt,
isAccurateMatch);
}
if (!isLabelExist || isAccurateMatch) {
catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt,
isAccurateMatch);
}
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt);
Expand Down
Loading