diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java index 05765276e3dd5c..4fda341e115cb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java @@ -24,12 +24,17 @@ import com.google.common.base.Strings; +// CANCEL LOAD statement used to cancel load job. +// +// syntax: +// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx") public class CancelLoadStmt extends DdlStmt { private String dbName; private String label; private Expr whereClause; + private boolean isAccurateMatch; public String getDbName() { return dbName; @@ -42,6 +47,11 @@ public String getLabel() { public CancelLoadStmt(String dbName, Expr whereClause) { this.dbName = dbName; this.whereClause = whereClause; + this.isAccurateMatch = false; + } + + public boolean isAccurateMatch() { + return isAccurateMatch; } @Override @@ -68,10 +78,17 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { if (whereClause instanceof BinaryPredicate) { BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause; + isAccurateMatch = true; if (binaryPredicate.getOp() != Operator.EQ) { valid = false; break; } + } else if (whereClause instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) whereClause; + if (likePredicate.getOp() != LikePredicate.Operator.LIKE) { + valid = false; + break; + } } else { valid = false; break; @@ -101,7 +118,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { } while (false); if (!valid) { - throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\""); + throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," + + " or LABEL LIKE \"matcher\""); } } @@ -119,4 +137,9 @@ public String toSql() { return stringBuilder.toString(); } + @Override + public String toString() { + return toSql(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index bbc549adc7eca5..ded704cb8d935f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -107,7 +107,6 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionStatus; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -133,6 +132,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; public class Load { private static final Logger LOG = LogManager.getLogger(Load.class); @@ -1545,7 +1545,7 @@ private boolean unprotectIsLabelUsed(long dbId, String label, long timestamp, bo return false; } - public boolean isLabelExist(String dbName, String label) throws DdlException { + public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException { // get load job and check state Database db = Catalog.getCurrentCatalog().getDb(dbName); if (db == null) { @@ -1557,8 +1557,19 @@ public boolean isLabelExist(String dbName, String label) throws DdlException { if (labelToLoadJobs == null) { return false; } - List loadJobs = labelToLoadJobs.get(label); - if (loadJobs == null) { + List loadJobs = Lists.newArrayList(); + if (isAccurateMatch) { + if (labelToLoadJobs.containsKey(labelValue)) { + loadJobs.addAll(labelToLoadJobs.get(labelValue)); + } + } else { + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(labelValue)) { + loadJobs.addAll(entry.getValue()); + } + } + } + if (loadJobs.isEmpty()) { return false; } if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) { @@ -1570,6 +1581,93 @@ public boolean isLabelExist(String dbName, String label) throws DdlException { } } + public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException { + // get params + String dbName = stmt.getDbName(); + String label = stmt.getLabel(); + + // get load job and check state + Database db = Catalog.getCurrentCatalog().getDb(dbName); + if (db == null) { + throw new DdlException("Db does not exist. name: " + dbName); + } + // List of load jobs waiting to be cancelled + List loadJobs = Lists.newArrayList(); + readLock(); + try { + Map> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId()); + if (labelToLoadJobs == null) { + throw new DdlException("Load job does not exist"); + } + + // get jobs by label + List matchLoadJobs = Lists.newArrayList(); + if (isAccurateMatch) { + if (labelToLoadJobs.containsKey(label)) { + matchLoadJobs.addAll(labelToLoadJobs.get(label)); + } + } else { + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(label)) { + matchLoadJobs.addAll(entry.getValue()); + } + } + } + + if (matchLoadJobs.isEmpty()) { + throw new DdlException("Load job does not exist"); + } + + // check state here + List uncompletedLoadJob = matchLoadJobs.stream().filter(job -> { + JobState state = job.getState(); + return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED; + }).collect(Collectors.toList()); + if (uncompletedLoadJob.isEmpty()) { + throw new DdlException("There is no uncompleted job which label " + + (isAccurateMatch ? "is " : "like ") + stmt.getLabel()); + } + loadJobs.addAll(uncompletedLoadJob); + } finally { + readUnlock(); + } + + // check auth here, cause we need table info + Set tableNames = Sets.newHashSet(); + for (LoadJob loadJob : loadJobs) { + tableNames.addAll(loadJob.getTableNames()); + } + + if (tableNames.isEmpty()) { + if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, + PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD"); + } + } else { + for (String tblName : tableNames) { + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName, + PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), tblName); + } + } + } + + // cancel job + for (LoadJob loadJob : loadJobs) { + List failedMsg = Lists.newArrayList(); + boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg); + if (!ok) { + throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " + + "label=[" + loadJob.getLabel() + "] failed msg=" + + (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0))); + } + } + + return true; + } + public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException { // get params String dbName = stmt.getDbName(); @@ -1609,16 +1707,16 @@ public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException { if (tableNames.isEmpty()) { // forward compatibility if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD"); } } else { for (String tblName : tableNames) { if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), tblName); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), tblName); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 5f6d6c31a1b966..763f50ec545101 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -284,6 +284,62 @@ public void recordFinishedLoadJob(String label, String dbName, long tableId, Etl Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob); } + public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException { + Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName()); + if (db == null) { + throw new DdlException("Db does not exist. name: " + stmt.getDbName()); + } + + // List of load jobs waiting to be cancelled + List loadJobs = Lists.newArrayList(); + readLock(); + try { + Map> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId()); + if (labelToLoadJobs == null) { + throw new DdlException("Load job does not exist"); + } + + // get jobs by label + List matchLoadJobs = Lists.newArrayList(); + if (isAccurateMatch) { + if (labelToLoadJobs.containsKey(stmt.getLabel())) { + matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel())); + } + } else { + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(stmt.getLabel())) { + matchLoadJobs.addAll(entry.getValue()); + } + } + } + + if (matchLoadJobs.isEmpty()) { + throw new DdlException("Load job does not exist"); + } + + // check state here + List uncompletedLoadJob = matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()) + .collect(Collectors.toList()); + if (uncompletedLoadJob.isEmpty()) { + throw new DdlException("There is no uncompleted job which label " + + (isAccurateMatch ? "is " : "like ") + stmt.getLabel()); + } + + loadJobs.addAll(uncompletedLoadJob); + } finally { + readUnlock(); + } + + for (LoadJob loadJob : loadJobs) { + try { + loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel")); + } catch (DdlException e) { + throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " + + "label=[" + loadJob.getLabel() + "] failed msg=" + e.getMessage()); + } + } + } + public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException { Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName()); if (db == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index dfa0b715c03bb0..7eeb5f8802e9d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -132,11 +132,17 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { catalog.getLoadManager().createLoadJobFromStmt(loadStmt); } } else if (ddlStmt instanceof CancelLoadStmt) { - if (catalog.getLoadInstance().isLabelExist( - ((CancelLoadStmt) ddlStmt).getDbName(), ((CancelLoadStmt) ddlStmt).getLabel())) { - catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt); - } else { - catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt); + boolean isAccurateMatch = ((CancelLoadStmt) ddlStmt).isAccurateMatch(); + boolean isLabelExist = catalog.getLoadInstance().isLabelExist( + ((CancelLoadStmt) ddlStmt).getDbName(), + ((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch); + if (isLabelExist) { + catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt, + isAccurateMatch); + } + if (!isLabelExist || isAccurateMatch) { + catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt, + isAccurateMatch); } } else if (ddlStmt instanceof CreateRoutineLoadStmt) { catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java new file mode 100644 index 00000000000000..5104b713682aed --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java @@ -0,0 +1,88 @@ +package org.apache.doris.analysis; + +import mockit.Expectations; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.FakeCatalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CancelLoadStmtTest { + private Analyzer analyzer; + private Catalog catalog; + + FakeCatalog fakeCatalog; + + @Before + public void setUp() { + fakeCatalog = new FakeCatalog(); + + catalog = AccessTestUtil.fetchAdminCatalog(); + FakeCatalog.setCatalog(catalog); + + analyzer = AccessTestUtil.fetchAdminAnalyzer(true); + new Expectations(analyzer) { + { + analyzer.getDefaultDb(); + minTimes = 0; + result = "testCluster:testDb"; + + analyzer.getQualifiedUser(); + minTimes = 0; + result = "testCluster:testUser"; + + analyzer.getClusterName(); + minTimes = 0; + result = "testCluster"; + + analyzer.getCatalog(); + minTimes = 0; + result = catalog; + } + }; + } + + @Test + public void testNormal() throws UserException, AnalysisException { + SlotRef slotRef = new SlotRef(null, "label"); + StringLiteral stringLiteral = new StringLiteral("doris_test_label"); + + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral); + CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate); + stmt.analyze(analyzer); + Assert.assertTrue(stmt.isAccurateMatch()); + Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` = 'doris_test_label'", stmt.toString()); + + LikePredicate likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral); + stmt = new CancelLoadStmt(null, likePredicate); + stmt.analyze(analyzer); + Assert.assertFalse(stmt.isAccurateMatch()); + Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` LIKE 'doris_test_label'", stmt.toString()); + } + + @Test(expected = AnalysisException.class) + public void testNoDb() throws UserException, AnalysisException { + SlotRef slotRef = new SlotRef(null, "label"); + StringLiteral stringLiteral = new StringLiteral("doris_test_label"); + new Expectations(analyzer) { + { + analyzer.getDefaultDb(); + minTimes = 0; + result = ""; + + analyzer.getClusterName(); + minTimes = 0; + result = "testCluster"; + } + }; + + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral); + CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate); + stmt.analyze(analyzer); + Assert.fail("No exception throws."); + } +}