From 2967f5b1d791ce381ba8ad8473ca4d7d9862b9d5 Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 2 Sep 2020 11:32:47 +0800 Subject: [PATCH 1/5] save code --- .../apache/doris/analysis/CancelLoadStmt.java | 20 ++- .../main/java/org/apache/doris/load/Load.java | 127 ++++++++++++++++-- .../apache/doris/load/loadv2/LoadManager.java | 56 ++++++++ .../java/org/apache/doris/qe/DdlExecutor.java | 10 +- 4 files changed, 201 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java index 05765276e3dd5c..d4fb22d095ad3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java @@ -24,12 +24,17 @@ import com.google.common.base.Strings; +// CANCEL LOAD statement used to cancel load job. +// +// syntax: +// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx") public class CancelLoadStmt extends DdlStmt { private String dbName; private String label; private Expr whereClause; + private boolean isAccurateMatch; public String getDbName() { return dbName; @@ -42,6 +47,11 @@ public String getLabel() { public CancelLoadStmt(String dbName, Expr whereClause) { this.dbName = dbName; this.whereClause = whereClause; + this.isAccurateMatch = false; + } + + public boolean isAccurateMatch() { + return isAccurateMatch; } @Override @@ -69,6 +79,13 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { if (whereClause instanceof BinaryPredicate) { BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause; if (binaryPredicate.getOp() != Operator.EQ) { + valid = false; + isAccurateMatch = true; + break; + } + } else if (whereClause instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) whereClause; + if (likePredicate.getOp() != LikePredicate.Operator.LIKE) { valid = false; break; } @@ -101,7 +118,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { } while (false); if (!valid) { - throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\""); + throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," + + " or LABEL LIKE \"matcher\""); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index bbc549adc7eca5..80a3b3ff286ca2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -107,7 +107,6 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionStatus; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -133,6 +132,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; public class Load { private static final Logger LOG = LogManager.getLogger(Load.class); @@ -1545,7 +1545,7 @@ private boolean unprotectIsLabelUsed(long dbId, String label, long timestamp, bo return false; } - public boolean isLabelExist(String dbName, String label) throws DdlException { + public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException { // get load job and check state Database db = Catalog.getCurrentCatalog().getDb(dbName); if (db == null) { @@ -1557,8 +1557,19 @@ public boolean isLabelExist(String dbName, String label) throws DdlException { if (labelToLoadJobs == null) { return false; } - List loadJobs = labelToLoadJobs.get(label); - if (loadJobs == null) { + List loadJobs = Lists.newArrayList(); + if (isAccurateMatch) { + if (labelToLoadJobs.containsKey(labelValue)) { + loadJobs.addAll(labelToLoadJobs.get(labelValue)); + } + } else { + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(labelValue)) { + loadJobs.addAll(entry.getValue()); + } + } + } + if (loadJobs.isEmpty()) { return false; } if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) { @@ -1570,6 +1581,105 @@ public boolean isLabelExist(String dbName, String label) throws DdlException { } } + public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException { + // get params + String dbName = stmt.getDbName(); + String label = stmt.getLabel(); + + // get load job and check state + Database db = Catalog.getCurrentCatalog().getDb(dbName); + if (db == null) { + throw new DdlException("Db does not exist. name: " + dbName); + } + // List of load jobs waiting to be cancelled + List loadJobs = Lists.newArrayList(); + readLock(); + try { + Map> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId()); + if (labelToLoadJobs == null) { + throw new DdlException("Load job does not exist"); + } + + // get jobs by label + List matchLoadJobs = Lists.newArrayList(); + if (isAccurateMatch) { + if (!labelToLoadJobs.containsKey(label)) { + matchLoadJobs.addAll(labelToLoadJobs.get(label)); + } + } else { + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(label)) { + matchLoadJobs.addAll(entry.getValue()); + } + } + } + + if (matchLoadJobs.isEmpty()) { + throw new DdlException("Load job does not exist"); + } + + // check state here + if (isAccurateMatch) { + // only the last one should be running + LoadJob job = matchLoadJobs.get(matchLoadJobs.size() - 1); + JobState state = job.getState(); + if (state == JobState.CANCELLED) { + throw new DdlException("Load job has been cancelled"); + } else if (state == JobState.QUORUM_FINISHED || state == JobState.FINISHED) { + throw new DdlException("Load job has been finished"); + } + loadJobs.add(job); + } else { + List uncompletedLoadJob = matchLoadJobs.stream().filter(job -> { + JobState state = job.getState(); + return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED; + }).collect(Collectors.toList()); + if (uncompletedLoadJob.isEmpty()) { + throw new DdlException("Load jobs which label like " + stmt.getLabel() + + " have all been cancelled or finished"); + } + loadJobs.addAll(uncompletedLoadJob); + } + } finally { + readUnlock(); + } + + // check auth here, cause we need table info + Set tableNames = Sets.newHashSet(); + for (LoadJob loadJob : loadJobs) { + tableNames.addAll(loadJob.getTableNames()); + } + + if (tableNames.isEmpty()) { + if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, + PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD"); + } + } else { + for (String tblName : tableNames) { + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName, + PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), tblName); + } + } + } + + // cancel job + for (LoadJob loadJob : loadJobs) { + List failedMsg = Lists.newArrayList(); + boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg); + if (!ok) { + throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " + + "label=[" + loadJob.getLabel() + "] failed msg=" + + (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0))); + } + } + + return true; + } + public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException { // get params String dbName = stmt.getDbName(); @@ -1609,16 +1719,16 @@ public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException { if (tableNames.isEmpty()) { // forward compatibility if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD"); } } else { for (String tblName : tableNames) { if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), tblName); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), tblName); } } } @@ -1632,6 +1742,7 @@ public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException { return true; } + public boolean cancelLoadJob(LoadJob job, CancelType cancelType, String msg) { return cancelLoadJob(job, cancelType, msg, null); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 5f6d6c31a1b966..d49be5607b2786 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -284,6 +284,62 @@ public void recordFinishedLoadJob(String label, String dbName, long tableId, Etl Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob); } + public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException { + Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName()); + if (db == null) { + throw new DdlException("Db does not exist. name: " + stmt.getDbName()); + } + + // List of load jobs waiting to be cancelled + List loadJobs = Lists.newArrayList(); + readLock(); + try { + Map> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId()); + if (labelToLoadJobs == null) { + throw new DdlException("Load job does not exist"); + } + + // get jobs by label + List matchLoadJobs = Lists.newArrayList(); + if (isAccurateMatch) { + if (!labelToLoadJobs.containsKey(stmt.getLabel())) { + matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel())); + } + } else { + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(stmt.getLabel())) { + matchLoadJobs.addAll(entry.getValue()); + } + } + } + + if (matchLoadJobs.isEmpty()) { + throw new DdlException("Load job does not exist"); + } + + // check state here + List uncompletedLoadJob = matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()) + .collect(Collectors.toList()); + if (uncompletedLoadJob.isEmpty()) { + throw new DdlException("There is no uncompleted job which label " + + (isAccurateMatch ? "is " : "like ") + stmt.getLabel()); + } + + loadJobs.addAll(uncompletedLoadJob); + } finally { + readUnlock(); + } + + for (LoadJob loadJob : loadJobs) { + try { + loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel")); + } catch (DdlException e) { + throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " + + "label=[" + loadJob.getLabel() + "] failed msg=" + e.getMessage()); + } + } + } + public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException { Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName()); if (db == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index dfa0b715c03bb0..26d2d6b8d62c60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -133,10 +133,14 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { } } else if (ddlStmt instanceof CancelLoadStmt) { if (catalog.getLoadInstance().isLabelExist( - ((CancelLoadStmt) ddlStmt).getDbName(), ((CancelLoadStmt) ddlStmt).getLabel())) { - catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt); + ((CancelLoadStmt) ddlStmt).getDbName(), + ((CancelLoadStmt) ddlStmt).getLabel(), + ((CancelLoadStmt) ddlStmt).isAccurateMatch())) { + catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt, + ((CancelLoadStmt) ddlStmt).isAccurateMatch()); } else { - catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt); + catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt, + ((CancelLoadStmt) ddlStmt).isAccurateMatch()); } } else if (ddlStmt instanceof CreateRoutineLoadStmt) { catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt); From 1f9ac135e67826ea324aa1acbb390f6b637ceaa5 Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 2 Sep 2020 14:05:14 +0800 Subject: [PATCH 2/5] save code --- .../org/apache/doris/analysis/CancelLoadStmt.java | 2 +- .../src/main/java/org/apache/doris/load/Load.java | 2 +- .../org/apache/doris/load/loadv2/LoadManager.java | 2 +- .../java/org/apache/doris/qe/DdlExecutor.java | 15 +++++++++------ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java index d4fb22d095ad3e..895180aad492d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java @@ -78,9 +78,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { if (whereClause instanceof BinaryPredicate) { BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause; + isAccurateMatch = true; if (binaryPredicate.getOp() != Operator.EQ) { valid = false; - isAccurateMatch = true; break; } } else if (whereClause instanceof LikePredicate) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 80a3b3ff286ca2..8f7ae756a69ae6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1603,7 +1603,7 @@ public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throw // get jobs by label List matchLoadJobs = Lists.newArrayList(); if (isAccurateMatch) { - if (!labelToLoadJobs.containsKey(label)) { + if (labelToLoadJobs.containsKey(label)) { matchLoadJobs.addAll(labelToLoadJobs.get(label)); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index d49be5607b2786..763f50ec545101 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -302,7 +302,7 @@ public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws D // get jobs by label List matchLoadJobs = Lists.newArrayList(); if (isAccurateMatch) { - if (!labelToLoadJobs.containsKey(stmt.getLabel())) { + if (labelToLoadJobs.containsKey(stmt.getLabel())) { matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel())); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 26d2d6b8d62c60..9a1ea11f8341e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -132,15 +132,18 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { catalog.getLoadManager().createLoadJobFromStmt(loadStmt); } } else if (ddlStmt instanceof CancelLoadStmt) { - if (catalog.getLoadInstance().isLabelExist( + boolean isAccurateMatch = ((CancelLoadStmt) ddlStmt).isAccurateMatch(); + boolean isLabelExist = false; + isLabelExist = catalog.getLoadInstance().isLabelExist( ((CancelLoadStmt) ddlStmt).getDbName(), - ((CancelLoadStmt) ddlStmt).getLabel(), - ((CancelLoadStmt) ddlStmt).isAccurateMatch())) { + ((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch); + if (isLabelExist) { catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt, - ((CancelLoadStmt) ddlStmt).isAccurateMatch()); - } else { + isAccurateMatch); + } + if (!isLabelExist || isAccurateMatch) { catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt, - ((CancelLoadStmt) ddlStmt).isAccurateMatch()); + isAccurateMatch); } } else if (ddlStmt instanceof CreateRoutineLoadStmt) { catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt); From fd1e6a2809423ed0e372d862790786e76091aebf Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 2 Sep 2020 15:00:38 +0800 Subject: [PATCH 3/5] add ut --- .../apache/doris/analysis/CancelLoadStmt.java | 5 ++ .../java/org/apache/doris/qe/DdlExecutor.java | 3 +- .../doris/analysis/CancelLoadStmtTest.java | 88 +++++++++++++++++++ 3 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java index 895180aad492d5..4fda341e115cb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java @@ -137,4 +137,9 @@ public String toSql() { return stringBuilder.toString(); } + @Override + public String toString() { + return toSql(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 9a1ea11f8341e3..7eeb5f8802e9d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -133,8 +133,7 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { } } else if (ddlStmt instanceof CancelLoadStmt) { boolean isAccurateMatch = ((CancelLoadStmt) ddlStmt).isAccurateMatch(); - boolean isLabelExist = false; - isLabelExist = catalog.getLoadInstance().isLabelExist( + boolean isLabelExist = catalog.getLoadInstance().isLabelExist( ((CancelLoadStmt) ddlStmt).getDbName(), ((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch); if (isLabelExist) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java new file mode 100644 index 00000000000000..5104b713682aed --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java @@ -0,0 +1,88 @@ +package org.apache.doris.analysis; + +import mockit.Expectations; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.FakeCatalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CancelLoadStmtTest { + private Analyzer analyzer; + private Catalog catalog; + + FakeCatalog fakeCatalog; + + @Before + public void setUp() { + fakeCatalog = new FakeCatalog(); + + catalog = AccessTestUtil.fetchAdminCatalog(); + FakeCatalog.setCatalog(catalog); + + analyzer = AccessTestUtil.fetchAdminAnalyzer(true); + new Expectations(analyzer) { + { + analyzer.getDefaultDb(); + minTimes = 0; + result = "testCluster:testDb"; + + analyzer.getQualifiedUser(); + minTimes = 0; + result = "testCluster:testUser"; + + analyzer.getClusterName(); + minTimes = 0; + result = "testCluster"; + + analyzer.getCatalog(); + minTimes = 0; + result = catalog; + } + }; + } + + @Test + public void testNormal() throws UserException, AnalysisException { + SlotRef slotRef = new SlotRef(null, "label"); + StringLiteral stringLiteral = new StringLiteral("doris_test_label"); + + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral); + CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate); + stmt.analyze(analyzer); + Assert.assertTrue(stmt.isAccurateMatch()); + Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` = 'doris_test_label'", stmt.toString()); + + LikePredicate likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral); + stmt = new CancelLoadStmt(null, likePredicate); + stmt.analyze(analyzer); + Assert.assertFalse(stmt.isAccurateMatch()); + Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` LIKE 'doris_test_label'", stmt.toString()); + } + + @Test(expected = AnalysisException.class) + public void testNoDb() throws UserException, AnalysisException { + SlotRef slotRef = new SlotRef(null, "label"); + StringLiteral stringLiteral = new StringLiteral("doris_test_label"); + new Expectations(analyzer) { + { + analyzer.getDefaultDb(); + minTimes = 0; + result = ""; + + analyzer.getClusterName(); + minTimes = 0; + result = "testCluster"; + } + }; + + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral); + CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate); + stmt.analyze(analyzer); + Assert.fail("No exception throws."); + } +} From 655c006cf32541fbc8bc4d22cbcd8ab43d0e0d6f Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 2 Sep 2020 15:15:26 +0800 Subject: [PATCH 4/5] fix --- fe/fe-core/src/main/java/org/apache/doris/load/Load.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 8f7ae756a69ae6..68dba2c3b37b19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1742,7 +1742,6 @@ public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException { return true; } - public boolean cancelLoadJob(LoadJob job, CancelType cancelType, String msg) { return cancelLoadJob(job, cancelType, msg, null); } From 0f9b9cf94e49ff0646fc0cb66943bf8fd254d1c4 Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 30 Sep 2020 01:12:09 +0800 Subject: [PATCH 5/5] unify logic --- .../main/java/org/apache/doris/load/Load.java | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 68dba2c3b37b19..ded704cb8d935f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1619,27 +1619,15 @@ public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throw } // check state here - if (isAccurateMatch) { - // only the last one should be running - LoadJob job = matchLoadJobs.get(matchLoadJobs.size() - 1); + List uncompletedLoadJob = matchLoadJobs.stream().filter(job -> { JobState state = job.getState(); - if (state == JobState.CANCELLED) { - throw new DdlException("Load job has been cancelled"); - } else if (state == JobState.QUORUM_FINISHED || state == JobState.FINISHED) { - throw new DdlException("Load job has been finished"); - } - loadJobs.add(job); - } else { - List uncompletedLoadJob = matchLoadJobs.stream().filter(job -> { - JobState state = job.getState(); - return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED; - }).collect(Collectors.toList()); - if (uncompletedLoadJob.isEmpty()) { - throw new DdlException("Load jobs which label like " + stmt.getLabel() + - " have all been cancelled or finished"); - } - loadJobs.addAll(uncompletedLoadJob); + return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED; + }).collect(Collectors.toList()); + if (uncompletedLoadJob.isEmpty()) { + throw new DdlException("There is no uncompleted job which label " + + (isAccurateMatch ? "is " : "like ") + stmt.getLabel()); } + loadJobs.addAll(uncompletedLoadJob); } finally { readUnlock(); }