From 0c9fa3ee100ec87a0943c4b1db8884821ae44cfe Mon Sep 17 00:00:00 2001 From: stalary Date: Thu, 12 May 2022 23:21:03 +0800 Subject: [PATCH 1/2] MOD: cancel load support state --- .../apache/doris/analysis/CancelLoadStmt.java | 150 ++++++++------- .../apache/doris/common/CaseSensibility.java | 1 + .../main/java/org/apache/doris/load/Load.java | 178 ------------------ .../apache/doris/load/loadv2/LoadManager.java | 75 +++++--- .../java/org/apache/doris/qe/DdlExecutor.java | 13 +- .../doris/analysis/CancelLoadStmtTest.java | 2 - 6 files changed, 131 insertions(+), 288 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java index 545ed9d18768e3..82aa121c3f91c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java @@ -23,6 +23,11 @@ import org.apache.doris.common.UserException; import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import lombok.Getter; + +import java.util.List; + // CANCEL LOAD statement used to cancel load job. // @@ -30,28 +35,88 @@ // CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx") public class CancelLoadStmt extends DdlStmt { + @Getter private String dbName; + + @Getter + private CompoundPredicate.Operator operator; + + @Getter private String label; - private Expr whereClause; - private boolean isAccurateMatch; + @Getter + private String state; - public String getDbName() { - return dbName; - } + private Expr whereClause; - public String getLabel() { - return label; - } + private static final List SUPPORT_COLUMNS = Lists.newArrayList("label", "state"); public CancelLoadStmt(String dbName, Expr whereClause) { this.dbName = dbName; this.whereClause = whereClause; - this.isAccurateMatch = false; } - public boolean isAccurateMatch() { - return isAccurateMatch; + private void checkColumn(Expr expr, boolean like) throws AnalysisException { + String inputCol = ((SlotRef) expr.getChild(0)).getColumnName().toLowerCase(); + if (!SUPPORT_COLUMNS.contains(inputCol)) { + throw new AnalysisException("Current not support " + inputCol); + } + if (!(expr.getChild(1) instanceof StringLiteral)) { + throw new AnalysisException("Value must is string"); + } + + String inputValue = expr.getChild(1).getStringValue(); + if (Strings.isNullOrEmpty(inputValue)) { + throw new AnalysisException("Value can't is null"); + } + if (like && !inputValue.contains("%")) { + inputValue = "%" + inputValue + "%"; + } + if (inputCol.equals("label")) { + label = inputValue; + } + if (inputCol.equals("state")) { + state = inputValue; + } + } + + private void likeCheck(Expr expr) throws AnalysisException { + if (expr instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) expr; + boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp()); + if (!like) { + throw new AnalysisException("Not support REGEXP"); + } + checkColumn(expr, true); + } + } + + private void binaryCheck(Expr expr) throws AnalysisException { + if (expr instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause; + if (!Operator.EQ.equals(binaryPredicate.getOp())) { + throw new AnalysisException("Only support equal or like"); + } + checkColumn(expr, false); + } + } + + private void compoundCheck(Expr expr) throws AnalysisException { + if (expr == null) { + throw new AnalysisException("Where clause can't is null"); + } + if (expr instanceof CompoundPredicate) { + // current only support label and state + CompoundPredicate compoundPredicate = (CompoundPredicate) whereClause; + for (int i = 0; i < 2; i++) { + Expr child = compoundPredicate.getChild(i); + if (child instanceof CompoundPredicate) { + throw new AnalysisException("Current only support label and state"); + } + likeCheck(expr); + binaryCheck(expr); + } + } } @Override @@ -67,63 +132,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { } // check auth after we get real load job - - // analyze expr if not null - boolean valid = true; - do { - if (whereClause == null) { - valid = false; - break; - } - - if (whereClause instanceof BinaryPredicate) { - BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause; - isAccurateMatch = true; - if (binaryPredicate.getOp() != Operator.EQ) { - valid = false; - break; - } - } else if (whereClause instanceof LikePredicate) { - LikePredicate likePredicate = (LikePredicate) whereClause; - if (likePredicate.getOp() != LikePredicate.Operator.LIKE) { - valid = false; - break; - } - } else { - valid = false; - break; - } - - // left child - if (!(whereClause.getChild(0) instanceof SlotRef)) { - valid = false; - break; - } - if (!((SlotRef) whereClause.getChild(0)).getColumnName().equalsIgnoreCase("label")) { - valid = false; - break; - } - - // right child - if (!(whereClause.getChild(1) instanceof StringLiteral)) { - valid = false; - break; - } - - label = ((StringLiteral) whereClause.getChild(1)).getStringValue(); - if (Strings.isNullOrEmpty(label)) { - valid = false; - break; - } - if (!isAccurateMatch && !label.contains("%")) { - label = "%" + label + "%"; - } - } while (false); - - if (!valid) { - throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," + - " or LABEL LIKE \"matcher\""); - } + // analyze expr + compoundCheck(whereClause); } @Override @@ -131,11 +141,11 @@ public String toSql() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("CANCEL LOAD "); if (!Strings.isNullOrEmpty(dbName)) { - stringBuilder.append("FROM " + dbName); + stringBuilder.append("FROM ").append(dbName); } if (whereClause != null) { - stringBuilder.append(" WHERE " + whereClause.toSql()); + stringBuilder.append(" WHERE ").append(whereClause.toSql()); } return stringBuilder.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java index ff4ebdf775070d..41c20b551ab1f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java @@ -28,6 +28,7 @@ public enum CaseSensibility { ROLE(false), HOST(false), LABEL(false), + STATE(false), VARIABLES(true), RESOURCE(true), CONFIG(true), diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 2b151c0452bdb7..37f0ead0cb0318 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1616,184 +1616,6 @@ private boolean checkMultiLabelUsed(long dbId, String label, long timestamp) thr return false; } - public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException, AnalysisException { - // get load job and check state - Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName); - readLock(); - try { - Map> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId()); - if (labelToLoadJobs == null) { - return false; - } - List loadJobs = Lists.newArrayList(); - if (isAccurateMatch) { - if (labelToLoadJobs.containsKey(labelValue)) { - loadJobs.addAll(labelToLoadJobs.get(labelValue)); - } - } else { - PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility()); - for (Map.Entry> entry : labelToLoadJobs.entrySet()) { - if (matcher.match(entry.getKey())) { - loadJobs.addAll(entry.getValue()); - } - } - } - if (loadJobs.isEmpty()) { - return false; - } - if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) { - return false; - } - return true; - } finally { - readUnlock(); - } - } - - public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException { - // get params - String dbName = stmt.getDbName(); - String label = stmt.getLabel(); - - // get load job and check state - Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName); - // List of load jobs waiting to be cancelled - List loadJobs = Lists.newArrayList(); - readLock(); - try { - Map> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId()); - if (labelToLoadJobs == null) { - throw new DdlException("Load job does not exist"); - } - - // get jobs by label - List matchLoadJobs = Lists.newArrayList(); - if (isAccurateMatch) { - if (labelToLoadJobs.containsKey(label)) { - matchLoadJobs.addAll(labelToLoadJobs.get(label)); - } - } else { - PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility()); - for (Map.Entry> entry : labelToLoadJobs.entrySet()) { - if (matcher.match(entry.getKey())) { - loadJobs.addAll(entry.getValue()); - } - } - } - - if (matchLoadJobs.isEmpty()) { - throw new DdlException("Load job does not exist"); - } - - // check state here - List uncompletedLoadJob = matchLoadJobs.stream().filter(job -> { - JobState state = job.getState(); - return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED; - }).collect(Collectors.toList()); - if (uncompletedLoadJob.isEmpty()) { - throw new DdlException("There is no uncompleted job which label " + - (isAccurateMatch ? "is " : "like ") + stmt.getLabel()); - } - loadJobs.addAll(uncompletedLoadJob); - } finally { - readUnlock(); - } - - // check auth here, cause we need table info - Set tableNames = Sets.newHashSet(); - for (LoadJob loadJob : loadJobs) { - tableNames.addAll(loadJob.getTableNames()); - } - - if (tableNames.isEmpty()) { - if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, - PrivPredicate.LOAD)) { - ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD"); - } - } else { - for (String tblName : tableNames) { - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName, - PrivPredicate.LOAD)) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), dbName + ": " + tblName); - } - } - } - - // cancel job - for (LoadJob loadJob : loadJobs) { - List failedMsg = Lists.newArrayList(); - boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg); - if (!ok) { - throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " + - "label=[" + loadJob.getLabel() + "] failed msg=" + - (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0))); - } - } - - return true; - } - - public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException { - // get params - String dbName = stmt.getDbName(); - String label = stmt.getLabel(); - - // get load job and check state - Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName); - LoadJob job; - readLock(); - try { - Map> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId()); - if (labelToLoadJobs == null) { - throw new DdlException("Load job does not exist"); - } - - List loadJobs = labelToLoadJobs.get(label); - if (loadJobs == null) { - throw new DdlException("Load job does not exist"); - } - // only the last one should be running - job = loadJobs.get(loadJobs.size() - 1); - JobState state = job.getState(); - if (state == JobState.CANCELLED) { - throw new DdlException("Load job has been cancelled"); - } else if (state == JobState.QUORUM_FINISHED || state == JobState.FINISHED) { - throw new DdlException("Load job has been finished"); - } - } finally { - readUnlock(); - } - - // check auth here, cause we need table info - Set tableNames = job.getTableNames(); - if (tableNames.isEmpty()) { - // forward compatibility - if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, - PrivPredicate.LOAD)) { - ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD"); - } - } else { - for (String tblName : tableNames) { - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName, - PrivPredicate.LOAD)) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), dbName + ": " + tblName); - } - } - } - - // cancel job - List failedMsg = Lists.newArrayList(); - if (!cancelLoadJob(job, CancelType.USER_CANCEL, "user cancel", failedMsg)) { - throw new DdlException("Cancel load job fail: " + (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0))); - } - - return true; - } - public boolean cancelLoadJob(LoadJob job, CancelType cancelType, String msg) { return cancelLoadJob(job, cancelType, msg, null); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 4ac2a3550f46b9..cb3ce3fcdf257e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.CancelLoadStmt; +import org.apache.doris.analysis.CompoundPredicate; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -50,6 +51,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -62,6 +65,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -301,51 +305,70 @@ public void recordFinishedLoadJob(String label, long transactionId, String dbNam Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob); } - public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException { - Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName()); + private void addNeedCancelLoadJob(CancelLoadStmt stmt, List loadJobs, List matchLoadJobs) + throws AnalysisException { + String label = stmt.getLabel(); + if (StringUtils.isNotEmpty(label)) { + if (label.contains("%")) { + PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility()); + for (LoadJob loadJob : loadJobs) { + if (matcher.match(loadJob.getLabel())) { + matchLoadJobs.add(loadJob); + } + } + } else { + matchLoadJobs.addAll(loadJobs.stream().filter(job -> label.equals(job.getLabel())).collect(Collectors.toList())); + } + } + String state = stmt.getState(); + if (StringUtils.isNotEmpty(state)) { + if (state.contains("%")) { + PatternMatcher matcher = PatternMatcher.createMysqlPattern(state, CaseSensibility.STATE.getCaseSensibility()); + if (CompoundPredicate.Operator.AND.equals(stmt.getOperator())) { + matchLoadJobs.removeIf(loadJob -> matcher.match(loadJob.getState().name().toLowerCase())); + } else { + for (LoadJob loadJob : loadJobs) { + if (matcher.match(loadJob.getState().name().toLowerCase())) { + matchLoadJobs.add(loadJob); + } + } + } + } else { + if (CompoundPredicate.Operator.AND.equals(stmt.getOperator())) { + matchLoadJobs.removeIf(loadJob -> state.equals(loadJob.getState().name().toLowerCase())); + } else { + matchLoadJobs.addAll( + loadJobs.stream().filter(job -> state.equals(job.getState().name().toLowerCase())) + .collect(Collectors.toList())); + } + } + } + } + public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException { + Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName()); // List of load jobs waiting to be cancelled - List loadJobs = Lists.newArrayList(); + List matchLoadJobs = Lists.newArrayList(); readLock(); try { Map> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId()); if (labelToLoadJobs == null) { throw new DdlException("Load job does not exist"); } - - // get jobs by label - List matchLoadJobs = Lists.newArrayList(); - if (isAccurateMatch) { - if (labelToLoadJobs.containsKey(stmt.getLabel())) { - matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel())); - } - } else { - PatternMatcher matcher = PatternMatcher.createMysqlPattern(stmt.getLabel(), CaseSensibility.LABEL.getCaseSensibility()); - for (Map.Entry> entry : labelToLoadJobs.entrySet()) { - if (matcher.match(entry.getKey())) { - matchLoadJobs.addAll(entry.getValue()); - } - } - } - + addNeedCancelLoadJob(stmt, labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), matchLoadJobs); if (matchLoadJobs.isEmpty()) { throw new DdlException("Load job does not exist"); } - // check state here List uncompletedLoadJob = matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()) .collect(Collectors.toList()); if (uncompletedLoadJob.isEmpty()) { - throw new DdlException("There is no uncompleted job which label " + - (isAccurateMatch ? "is " : "like ") + stmt.getLabel()); + throw new DdlException("There is no uncompleted job"); } - - loadJobs.addAll(uncompletedLoadJob); } finally { readUnlock(); } - - for (LoadJob loadJob : loadJobs) { + for (LoadJob loadJob : matchLoadJobs) { try { loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel")); } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 5904848b552e50..174f505dac901b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -170,18 +170,7 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { catalog.getLoadManager().createLoadJobFromStmt(loadStmt); } } else if (ddlStmt instanceof CancelLoadStmt) { - boolean isAccurateMatch = ((CancelLoadStmt) ddlStmt).isAccurateMatch(); - boolean isLabelExist = catalog.getLoadInstance().isLabelExist( - ((CancelLoadStmt) ddlStmt).getDbName(), - ((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch); - if (isLabelExist) { - catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt, - isAccurateMatch); - } - if (!isLabelExist || isAccurateMatch) { - catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt, - isAccurateMatch); - } + catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt); } else if (ddlStmt instanceof CreateRoutineLoadStmt) { catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof PauseRoutineLoadStmt) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java index 4ed21d7ae2572c..1d7e460b709af3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java @@ -70,13 +70,11 @@ public void testNormal() throws UserException, AnalysisException { BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral); CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate); stmt.analyze(analyzer); - Assert.assertTrue(stmt.isAccurateMatch()); Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` = 'doris_test_label'", stmt.toString()); LikePredicate likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral); stmt = new CancelLoadStmt(null, likePredicate); stmt.analyze(analyzer); - Assert.assertFalse(stmt.isAccurateMatch()); Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` LIKE 'doris_test_label'", stmt.toString()); } From c7635bd21f3dc4547e30e1fda81ddc0fbefbe87a Mon Sep 17 00:00:00 2001 From: Rongqian Li Date: Sat, 14 May 2022 19:25:00 +0800 Subject: [PATCH 2/2] =?UTF-8?q?MOD:=20=E5=AE=8C=E5=96=84=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/doris/analysis/CancelLoadStmt.java | 33 ++- .../apache/doris/common/CaseSensibility.java | 4 +- .../apache/doris/load/loadv2/LoadManager.java | 247 ++++++++++-------- .../doris/analysis/CancelLoadStmtTest.java | 163 +++++++----- 4 files changed, 260 insertions(+), 187 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java index 82aa121c3f91c2..7af21948b5ddff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java @@ -29,12 +29,15 @@ import java.util.List; -// CANCEL LOAD statement used to cancel load job. -// -// syntax: -// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx") +/** + * CANCEL LOAD statement used to cancel load job. + * syntax: + * CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx") + **/ public class CancelLoadStmt extends DdlStmt { + private static final List SUPPORT_COLUMNS = Lists.newArrayList("label", "state"); + @Getter private String dbName; @@ -49,15 +52,13 @@ public class CancelLoadStmt extends DdlStmt { private Expr whereClause; - private static final List SUPPORT_COLUMNS = Lists.newArrayList("label", "state"); - public CancelLoadStmt(String dbName, Expr whereClause) { this.dbName = dbName; this.whereClause = whereClause; } private void checkColumn(Expr expr, boolean like) throws AnalysisException { - String inputCol = ((SlotRef) expr.getChild(0)).getColumnName().toLowerCase(); + String inputCol = ((SlotRef) expr.getChild(0)).getColumnName(); if (!SUPPORT_COLUMNS.contains(inputCol)) { throw new AnalysisException("Current not support " + inputCol); } @@ -72,10 +73,13 @@ private void checkColumn(Expr expr, boolean like) throws AnalysisException { if (like && !inputValue.contains("%")) { inputValue = "%" + inputValue + "%"; } - if (inputCol.equals("label")) { + if (inputCol.equalsIgnoreCase("label")) { label = inputValue; } - if (inputCol.equals("state")) { + if (inputCol.equalsIgnoreCase("state")) { + if (like) { + throw new AnalysisException("Only label can use like"); + } state = inputValue; } } @@ -93,7 +97,7 @@ private void likeCheck(Expr expr) throws AnalysisException { private void binaryCheck(Expr expr) throws AnalysisException { if (expr instanceof BinaryPredicate) { - BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause; + BinaryPredicate binaryPredicate = (BinaryPredicate) expr; if (!Operator.EQ.equals(binaryPredicate.getOp())) { throw new AnalysisException("Only support equal or like"); } @@ -107,15 +111,16 @@ private void compoundCheck(Expr expr) throws AnalysisException { } if (expr instanceof CompoundPredicate) { // current only support label and state - CompoundPredicate compoundPredicate = (CompoundPredicate) whereClause; + CompoundPredicate compoundPredicate = (CompoundPredicate) expr; for (int i = 0; i < 2; i++) { Expr child = compoundPredicate.getChild(i); if (child instanceof CompoundPredicate) { throw new AnalysisException("Current only support label and state"); } - likeCheck(expr); - binaryCheck(expr); + likeCheck(child); + binaryCheck(child); } + operator = compoundPredicate.getOp(); } } @@ -133,6 +138,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { // check auth after we get real load job // analyze expr + likeCheck(whereClause); + binaryCheck(whereClause); compoundCheck(whereClause); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java index 41c20b551ab1f4..651581a3c6e520 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java @@ -17,6 +17,9 @@ package org.apache.doris.common; +/** + * CaseSensibility Enum. + **/ public enum CaseSensibility { CLUSTER(true), DATABASE(true), @@ -28,7 +31,6 @@ public enum CaseSensibility { ROLE(false), HOST(false), LABEL(false), - STATE(false), VARIABLES(true), RESOURCE(true), CONFIG(true), diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index cb3ce3fcdf257e..5d54451ee8c19d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -18,7 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.CancelLoadStmt; -import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.CompoundPredicate.Operator; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -47,11 +47,11 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -65,7 +65,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -75,11 +74,10 @@ /** * The broker and mini load jobs(v2) are included in this class. - * * The lock sequence: * Database.lock - * LoadManager.lock - * LoadJob.lock + * LoadManager.lock + * LoadJob.lock */ public class LoadManager implements Writable { private static final Logger LOG = LogManager.getLogger(LoadManager.class); @@ -96,8 +94,6 @@ public LoadManager(LoadJobScheduler loadJobScheduler) { /** * This method will be invoked by the broker load(v2) now. - * @param stmt - * @throws DdlException */ public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException { Database database = checkDb(stmt.getLabel().getDbName()); @@ -116,8 +112,10 @@ public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException { throw new DdlException("LoadManager only support the broker and spark load."); } if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) { - throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " unfinished load jobs, " - + "please retry later. You can use `SHOW LOAD` to view submitted jobs"); + throw new DdlException( + "There are more than " + Config.desired_max_waiting_jobs + + " unfinished load jobs, please retry later. " + + "You can use `SHOW LOAD` to view submitted jobs"); } } @@ -143,9 +141,6 @@ private long unprotectedGetUnfinishedJobNum() { * This method will be invoked by streaming mini load. * It will begin the txn of mini load immediately without any scheduler . * - * @param request - * @return - * @throws UserException */ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws UserException { String cluster = SystemInfoService.DEFAULT_CLUSTER; @@ -159,7 +154,8 @@ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws User try { loadJob = new MiniLoadJob(database.getId(), table.getId(), request); // call unprotectedExecute before adding load job. so that if job is not started ok, no need to add. - // NOTICE(cmy): this order is only for Mini Load, because mini load's unprotectedExecute() only do beginTxn(). + // NOTICE(cmy): this order is only for Mini Load, because mini load's + // unprotectedExecute() only do beginTxn(). // for other kind of load job, execute the job after adding job. // Mini load job must be executed before release write lock. // Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun. @@ -168,7 +164,8 @@ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws User createLoadJob(loadJob); } catch (DuplicatedRequestException e) { // this is a duplicate request, just return previous txn id - LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId()); + LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(), + e.getTxnId()); return e.getTxnId(); } catch (UserException e) { if (loadJob != null) { @@ -194,14 +191,12 @@ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws User * Step1: lock the load manager * Step2: check the label in load manager * Step3: call the addLoadJob of load class - * Step3.1: lock the load - * Step3.2: check the label in load - * Step3.3: add the loadJob in load rather than load manager - * Step3.4: unlock the load + * Step3.1: lock the load + * Step3.2: check the label in load + * Step3.3: add the loadJob in load rather than load manager + * Step3.4: unlock the load * Step4: unlock the load manager - * @param stmt - * @param timestamp - * @throws DdlException + * */ public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long timestamp) throws DdlException { Database database = checkDb(stmt.getLabel().getDbName()); @@ -219,10 +214,9 @@ public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long time * It is used to check the label of v1 and v2 at the same time. * Finally, the non-streaming mini load will belongs to load class. * - * @param request - * @return if: mini load is a duplicated load, return false. - * else: return true. - * @throws DdlException + * @param request request + * @return if: mini load is a duplicated load, return false. else: return true. + * @deprecated not support mini load */ @Deprecated public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlException { @@ -240,6 +234,9 @@ public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlEx } } + /** + * MultiLoadMgr use. + **/ public void createLoadJobV1FromMultiStart(String fullDbName, String label) throws DdlException { Database database = checkDb(fullDbName); writeLock(); @@ -254,9 +251,7 @@ public void createLoadJobV1FromMultiStart(String fullDbName, String label) throw public void replayCreateLoadJob(LoadJob loadJob) { createLoadJob(loadJob); - LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) - .add("msg", "replay create load job") - .build()); + LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", "replay create load job").build()); } // add load job and also add to to callback factory @@ -266,7 +261,8 @@ private void createLoadJob(LoadJob loadJob) { return; } addLoadJob(loadJob); - // add callback before txn if load job is uncompleted, because callback will be performed on replay without txn begin + // add callback before txn if load job is uncompleted, + // because callback will be performed on replay without txn begin // register txn state listener if (!loadJob.isCompleted()) { Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); @@ -286,8 +282,11 @@ private void addLoadJob(LoadJob loadJob) { labelToLoadJobs.get(loadJob.getLabel()).add(loadJob); } + /** + * Record finished load job by editLog. + **/ public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType, - long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException { + long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException { // get db id Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbName); @@ -295,7 +294,8 @@ public void recordFinishedLoadJob(String label, long transactionId, String dbNam LoadJob loadJob; switch (jobType) { case INSERT: - loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg, trackingUrl); + loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg, + trackingUrl); break; default: return; @@ -305,46 +305,37 @@ public void recordFinishedLoadJob(String label, long transactionId, String dbNam Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob); } - private void addNeedCancelLoadJob(CancelLoadStmt stmt, List loadJobs, List matchLoadJobs) + /** + * Match need cancel loadJob by stmt. + **/ + @VisibleForTesting + public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List loadJobs, List matchLoadJobs) throws AnalysisException { String label = stmt.getLabel(); - if (StringUtils.isNotEmpty(label)) { - if (label.contains("%")) { - PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility()); - for (LoadJob loadJob : loadJobs) { - if (matcher.match(loadJob.getLabel())) { - matchLoadJobs.add(loadJob); - } - } - } else { - matchLoadJobs.addAll(loadJobs.stream().filter(job -> label.equals(job.getLabel())).collect(Collectors.toList())); - } - } String state = stmt.getState(); - if (StringUtils.isNotEmpty(state)) { - if (state.contains("%")) { - PatternMatcher matcher = PatternMatcher.createMysqlPattern(state, CaseSensibility.STATE.getCaseSensibility()); - if (CompoundPredicate.Operator.AND.equals(stmt.getOperator())) { - matchLoadJobs.removeIf(loadJob -> matcher.match(loadJob.getState().name().toLowerCase())); - } else { - for (LoadJob loadJob : loadJobs) { - if (matcher.match(loadJob.getState().name().toLowerCase())) { - matchLoadJobs.add(loadJob); - } - } - } - } else { - if (CompoundPredicate.Operator.AND.equals(stmt.getOperator())) { - matchLoadJobs.removeIf(loadJob -> state.equals(loadJob.getState().name().toLowerCase())); - } else { - matchLoadJobs.addAll( - loadJobs.stream().filter(job -> state.equals(job.getState().name().toLowerCase())) - .collect(Collectors.toList())); - } + PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility()); + matchLoadJobs.addAll(loadJobs.stream().filter(job -> { + if (stmt.getOperator() != null) { + // compound + boolean labelFilter = + label.contains("%") ? matcher.match(job.getLabel()) : job.getLabel().equalsIgnoreCase(label); + boolean stateFilter = job.getState().name().equalsIgnoreCase(state); + return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter : + labelFilter || stateFilter; } - } + if (StringUtils.isNotEmpty(label)) { + return label.contains("%") ? matcher.match(job.getLabel()) : job.getLabel().equalsIgnoreCase(label); + } + if (StringUtils.isNotEmpty(state)) { + return job.getState().name().equalsIgnoreCase(state); + } + return false; + }).collect(Collectors.toList())); } + /** + * Cancel load job by stmt. + **/ public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException { Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName()); // List of load jobs waiting to be cancelled @@ -355,13 +346,15 @@ public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisExce if (labelToLoadJobs == null) { throw new DdlException("Load job does not exist"); } - addNeedCancelLoadJob(stmt, labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), matchLoadJobs); + addNeedCancelLoadJob(stmt, + labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), + matchLoadJobs); if (matchLoadJobs.isEmpty()) { throw new DdlException("Load job does not exist"); } // check state here - List uncompletedLoadJob = matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()) - .collect(Collectors.toList()); + List uncompletedLoadJob = + matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList()); if (uncompletedLoadJob.isEmpty()) { throw new DdlException("There is no uncompleted job"); } @@ -372,12 +365,17 @@ public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisExce try { loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel")); } catch (DdlException e) { - throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " + - "label=[" + loadJob.getLabel() + "] failed msg=" + e.getMessage()); + throw new DdlException( + "Cancel load job [" + loadJob.getId() + "] fail, " + "label=[" + loadJob.getLabel() + + + "] failed msg=" + e.getMessage()); } } } + /** + * Replay end load job. + **/ public void replayEndLoadJob(LoadJobFinalOperation operation) { LoadJob job = idToLoadJob.get(operation.getId()); if (job == null) { @@ -390,12 +388,13 @@ public void replayEndLoadJob(LoadJobFinalOperation operation) { return; } job.unprotectReadEndOperation(operation); - LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId()) - .add("operation", operation) - .add("msg", "replay end load job") - .build()); + LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId()).add("operation", operation) + .add("msg", "replay end load job").build()); } + /** + * Replay update load job. + **/ public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) { long jobId = info.getJobId(); LoadJob job = idToLoadJob.get(jobId); @@ -407,6 +406,9 @@ public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) { job.replayUpdateStateInfo(info); } + /** + * Get load job num, used by proc. + **/ public int getLoadJobNum(JobState jobState, long dbId) { readLock(); try { @@ -414,23 +416,31 @@ public int getLoadJobNum(JobState jobState, long dbId) { if (labelToLoadJobs == null) { return 0; } - List loadJobList = labelToLoadJobs.values().stream() - .flatMap(entity -> entity.stream()).collect(Collectors.toList()); + List loadJobList = + labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList()); return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count(); } finally { readUnlock(); } } + + /** + * Get load job num, used by metric. + **/ public long getLoadJobNum(JobState jobState, EtlJobType jobType) { readLock(); try { - return idToLoadJob.values().stream().filter(j -> j.getState() == jobState && j.getJobType() == jobType).count(); + return idToLoadJob.values().stream().filter(j -> j.getState() == jobState && j.getJobType() == jobType) + .count(); } finally { readUnlock(); } } + /** + * Remove old load job. + **/ public void removeOldLoadJob() { long currentTimeMs = System.currentTimeMillis(); @@ -460,7 +470,9 @@ public void removeOldLoadJob() { } } - // only for those jobs which have etl state, like SparkLoadJob + /** + * Only for those jobs which have etl state, like SparkLoadJob. + **/ public void processEtlStateJobs() { idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.ETL)) .forEach(job -> { @@ -468,8 +480,8 @@ public void processEtlStateJobs() { ((SparkLoadJob) job).updateEtlStatus(); } catch (DataQualityException e) { LOG.info("update load job etl status failed. job id: {}", job.getId(), e); - job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG), - true, true); + job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, + DataQualityException.QUALITY_FAIL_MSG), true, true); } catch (UserException e) { LOG.warn("update load job etl status failed. job id: {}", job.getId(), e); job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true); @@ -479,7 +491,9 @@ public void processEtlStateJobs() { }); } - // only for those jobs which load by PushTask + /** + * Only for those jobs which load by PushTask. + **/ public void processLoadingStateJobs() { idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.LOADING)) .forEach(job -> { @@ -496,16 +510,17 @@ public void processLoadingStateJobs() { /** * This method will return the jobs info which can meet the condition of input param. - * @param dbId used to filter jobs which belong to this db - * @param labelValue used to filter jobs which's label is or like labelValue. + * + * @param dbId used to filter jobs which belong to this db + * @param labelValue used to filter jobs which's label is or like labelValue. * @param accurateMatch true: filter jobs which's label is labelValue. false: filter jobs which's label like itself. - * @param statesValue used to filter jobs which's state within the statesValue set. + * @param statesValue used to filter jobs which's state within the statesValue set. * @return The result is the list of jobInfo. - * JobInfo is a List which includes the comparable object: jobId, label, state etc. - * The result is unordered. + * JobInfo is a list which includes the comparable object: jobId, label, state etc. + * The result is unordered. */ - public List> getLoadJobInfosByDb(long dbId, String labelValue, - boolean accurateMatch, Set statesValue) throws AnalysisException { + public List> getLoadJobInfosByDb(long dbId, String labelValue, boolean accurateMatch, + Set statesValue) throws AnalysisException { LinkedList> loadJobInfos = new LinkedList>(); if (!dbIdToLabelToLoadJobs.containsKey(dbId)) { return loadJobInfos; @@ -529,8 +544,8 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, Map> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId); List loadJobList = Lists.newArrayList(); if (Strings.isNullOrEmpty(labelValue)) { - loadJobList.addAll(labelToLoadJobs.values() - .stream().flatMap(Collection::stream).collect(Collectors.toList())); + loadJobList.addAll( + labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList())); } else { // check label value if (accurateMatch) { @@ -540,7 +555,8 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, loadJobList.addAll(labelToLoadJobs.get(labelValue)); } else { // non-accurate match - PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility()); + PatternMatcher matcher = + PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility()); for (Map.Entry> entry : labelToLoadJobs.entrySet()) { if (matcher.match(entry.getKey())) { loadJobList.addAll(entry.getValue()); @@ -567,6 +583,9 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, } } + /** + * Get load job info. + **/ public void getLoadJobInfo(Load.JobInfo info) throws DdlException { String fullDbName = ClusterNamespace.getFullName(info.clusterName, info.dbName); info.dbName = fullDbName; @@ -600,8 +619,8 @@ public void prepareJobs() { } private void submitJobs() { - loadJobScheduler.submitJob(idToLoadJob.values().stream().filter( - loadJob -> loadJob.state == JobState.PENDING).collect(Collectors.toList())); + loadJobScheduler.submitJob(idToLoadJob.values().stream().filter(loadJob -> loadJob.state == JobState.PENDING) + .collect(Collectors.toList())); } private void analyzeLoadJobs() { @@ -617,16 +636,13 @@ private Database checkDb(String dbName) throws DdlException { } /** - * step1: if label has been used in old load jobs which belong to load class - * step2: if label has been used in v2 load jobs - * step2.1: if label has been user in v2 load jobs, the create timestamp will be checked + * step1: if label has been used in old load jobs which belong to load class. + * step2: if label has been used in v2 load jobs. + * step2.1: if label has been user in v2 load jobs, the create timestamp will be checked. * - * @param dbId - * @param label * @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job. */ - private void checkLabelUsed(long dbId, String label) - throws DdlException { + private void checkLabelUsed(long dbId, String label) throws DdlException { // if label has been used in old load jobs Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label); // if label has been used in v2 of load jobs @@ -660,16 +676,22 @@ private void writeUnlock() { lock.writeLock().unlock(); } + /** + * Init. + **/ public void initJobProgress(Long jobId, TUniqueId loadId, Set fragmentIds, - List relatedBackendIds) { + List relatedBackendIds) { LoadJob job = idToLoadJob.get(jobId); if (job != null) { job.initLoadProgress(loadId, fragmentIds, relatedBackendIds); } } - public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, - long scannedRows, long scannedBytes, boolean isDone) { + /** + * Update. + **/ + public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, + long scannedBytes, boolean isDone) { LoadJob job = idToLoadJob.get(jobId); if (job != null) { job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); @@ -679,7 +701,8 @@ public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId @Override public void write(DataOutput out) throws IOException { long currentTimeMs = System.currentTimeMillis(); - List loadJobs = idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList()); + List loadJobs = + idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList()); out.writeInt(loadJobs.size()); for (LoadJob loadJob : loadJobs) { @@ -687,6 +710,9 @@ public void write(DataOutput out) throws IOException { } } + /** + * Read from file. + **/ public void readFields(DataInput in) throws IOException { long currentTimeMs = System.currentTimeMillis(); int size = in.readInt(); @@ -706,12 +732,13 @@ public void readFields(DataInput in) throws IOException { if (loadJob.getState() == JobState.PENDING) { // bad case. When a mini load job is created and then FE restart. // the job will be in PENDING state forever. - // This is a temp solution to remove these jobs. And the mini load job should be deprecated in Doris v1.1 - TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr().getTransactionState( - loadJob.getDbId(), loadJob.getTransactionId()); + // This is a temp solution to remove these jobs. + // And the mini load job should be deprecated in Doris v1.1 + TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr() + .getTransactionState(loadJob.getDbId(), loadJob.getTransactionId()); if (state == null) { - LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}", - loadJob.getId(), loadJob.getDbId(), loadJob.getTransactionId()); + LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}", loadJob.getId(), + loadJob.getDbId(), loadJob.getTransactionId()); continue; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java index 1d7e460b709af3..3621cbcbadde7d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java @@ -17,86 +17,123 @@ package org.apache.doris.analysis; -import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.FakeCatalog; +import org.apache.doris.analysis.CompoundPredicate.Operator; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; +import org.apache.doris.load.loadv2.InsertLoadJob; +import org.apache.doris.load.loadv2.LoadJob; +import org.apache.doris.load.loadv2.LoadManager; +import org.apache.doris.utframe.TestWithFeService; -import mockit.Expectations; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; -public class CancelLoadStmtTest { - private Analyzer analyzer; - private Catalog catalog; +import java.util.ArrayList; +import java.util.List; - FakeCatalog fakeCatalog; +public class CancelLoadStmtTest extends TestWithFeService { - @Before - public void setUp() { - fakeCatalog = new FakeCatalog(); + private Analyzer analyzer; - catalog = AccessTestUtil.fetchAdminCatalog(); - FakeCatalog.setCatalog(catalog); + @Override + protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; + createDatabase("testDb"); + useDatabase("testDb"); + createTable("create table table1\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" + + "properties(\"replication_num\" = \"1\");"); + analyzer = new Analyzer(connectContext.getCatalog(), connectContext); + } - analyzer = AccessTestUtil.fetchAdminAnalyzer(true); - new Expectations(analyzer) { - { - analyzer.getDefaultDb(); - minTimes = 0; - result = "testCluster:testDb"; + @Test + public void testNormal() throws UserException { + SlotRef labelSlotRef = new SlotRef(null, "label"); + StringLiteral labelStringLiteral = new StringLiteral("doris_test_label"); - analyzer.getQualifiedUser(); - minTimes = 0; - result = "testCluster:testUser"; + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral = new StringLiteral("FINISHED"); - analyzer.getClusterName(); - minTimes = 0; - result = "testCluster"; + BinaryPredicate labelBinaryPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral); + CancelLoadStmt stmt = new CancelLoadStmt(null, labelBinaryPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label'", + stmt.toString()); - analyzer.getCatalog(); - minTimes = 0; - result = catalog; - } - }; - } + BinaryPredicate stateBinaryPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + stmt = new CancelLoadStmt(null, stateBinaryPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `state` = 'FINISHED'", stmt.toString()); - @Test - public void testNormal() throws UserException, AnalysisException { - SlotRef slotRef = new SlotRef(null, "label"); - StringLiteral stringLiteral = new StringLiteral("doris_test_label"); + LikePredicate labelLikePredicate = + new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral); + stmt = new CancelLoadStmt(null, labelLikePredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'", + stmt.toString()); - BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral); - CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate); + CompoundPredicate compoundAndPredicate = + new CompoundPredicate(Operator.AND, labelBinaryPredicate, stateBinaryPredicate); + stmt = new CancelLoadStmt(null, compoundAndPredicate); stmt.analyze(analyzer); - Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` = 'doris_test_label'", stmt.toString()); + Assertions.assertEquals( + "CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'FINISHED'", + stmt.toString()); - LikePredicate likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral); - stmt = new CancelLoadStmt(null, likePredicate); + CompoundPredicate compoundOrPredicate = + new CompoundPredicate(Operator.OR, labelBinaryPredicate, stateBinaryPredicate); + stmt = new CancelLoadStmt(null, compoundOrPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals( + "CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' OR `state` = 'FINISHED'", + stmt.toString()); + + // test match + List loadJobs = new ArrayList<>(); + InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, 10003L, 10005L, 0, "", ""); + loadJobs.add(insertLoadJob1); + InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, 10003L, 10005L, 0, "", ""); + loadJobs.add(insertLoadJob2); + InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, 10003L, 10005L, 0, "", ""); + loadJobs.add(insertLoadJob3); + // label + stmt = new CancelLoadStmt(null, labelBinaryPredicate); + stmt.analyze(analyzer); + List matchLoadJobs = new ArrayList<>(); + LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs); + Assertions.assertEquals(1, matchLoadJobs.size()); + // state + matchLoadJobs.clear(); + stmt = new CancelLoadStmt(null, stateBinaryPredicate); + stmt.analyze(analyzer); + LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs); + Assertions.assertEquals(3, matchLoadJobs.size()); + // or + matchLoadJobs.clear(); + stmt = new CancelLoadStmt(null, compoundOrPredicate); stmt.analyze(analyzer); - Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` LIKE 'doris_test_label'", stmt.toString()); + LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs); + Assertions.assertEquals(3, matchLoadJobs.size()); + // and + matchLoadJobs.clear(); + stmt = new CancelLoadStmt(null, compoundAndPredicate); + stmt.analyze(analyzer); + LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs); + Assertions.assertEquals(1, matchLoadJobs.size()); } - @Test(expected = AnalysisException.class) - public void testNoDb() throws UserException, AnalysisException { - SlotRef slotRef = new SlotRef(null, "label"); - StringLiteral stringLiteral = new StringLiteral("doris_test_label"); - new Expectations(analyzer) { - { - analyzer.getDefaultDb(); - minTimes = 0; - result = ""; - - analyzer.getClusterName(); - minTimes = 0; - result = "testCluster"; - } - }; - - BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral); - CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate); - stmt.analyze(analyzer); - Assert.fail("No exception throws."); + @Test + public void testError() { + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral = new StringLiteral("FINISHED"); + + LikePredicate stateLikePredicate = + new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, stateStringLiteral); + CancelLoadStmt stmt = new CancelLoadStmt(null, stateLikePredicate); + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only label can use like", + () -> stmt.analyze(analyzer)); } }