From 2c873a0b14882fbac1c54abc41c48286cd4d3b3c Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Tue, 7 May 2019 14:29:10 +0800 Subject: [PATCH 1/2] Add show load of loadv2 This change include the show load of loadv2 and some bug fix of loadv2. Firstly, the show load will perform both load and loadv2 info. According to loadv2, the ETL progress of loadv2 is N/A during the period of loading. Secondly, the loadv2 will be created when version of property is v2. This is a temporary property which will not influence the old broker load. After the loadv2 is finished, the default load will be changed to loadv2. Finally, there are some bug in LoadingTaskPlanner fixed by this change. --- be/src/util/uid_util.cpp | 6 +- .../org/apache/doris/analysis/LoadStmt.java | 21 ++++ .../org/apache/doris/catalog/Catalog.java | 3 + .../apache/doris/common/proc/LoadProcDir.java | 2 +- .../main/java/org/apache/doris/load/Load.java | 11 +-- .../doris/load/loadv2/BrokerLoadJob.java | 66 +++++++++---- .../load/loadv2/BrokerLoadPendingTask.java | 4 +- .../loadv2/BrokerLoadingTaskAttachment.java | 5 +- .../loadv2/BrokerPendingTaskAttachment.java | 6 +- .../org/apache/doris/load/loadv2/LoadJob.java | 99 +++++++++++++++---- .../doris/load/loadv2/LoadJobScheduler.java | 2 +- .../doris/load/loadv2/LoadLoadingTask.java | 40 ++++---- .../apache/doris/load/loadv2/LoadManager.java | 59 +++++++++++ .../apache/doris/load/loadv2/LoadTask.java | 23 +++-- .../doris/load/loadv2/LoadTaskCallback.java | 4 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 9 ++ .../doris/load/loadv2/TaskAttachment.java | 11 ++- .../load/routineload/RoutineLoadJob.java | 1 - .../java/org/apache/doris/qe/DdlExecutor.java | 8 +- .../org/apache/doris/qe/ShowExecutor.java | 24 ++++- .../doris/transaction/TabletCommitInfo.java | 7 ++ 21 files changed, 317 insertions(+), 94 deletions(-) diff --git a/be/src/util/uid_util.cpp b/be/src/util/uid_util.cpp index dcbef4ca5165a8..73d31db0624fdb 100644 --- a/be/src/util/uid_util.cpp +++ b/be/src/util/uid_util.cpp @@ -26,13 +26,13 @@ std::ostream& operator<<(std::ostream& os, const UniqueId& uid) { std::string print_id(const TUniqueId& id) { std::stringstream out; - out << std::hex << id.hi << ":" << id.lo; + out << std::hex << id.hi << "-" << id.lo; return out.str(); } std::string print_id(const PUniqueId& id) { std::stringstream out; - out << std::hex << id.hi() << ":" << id.lo(); + out << std::hex << id.hi() << "-" << id.lo(); return out.str(); } @@ -40,7 +40,7 @@ bool parse_id(const std::string& s, TUniqueId* id) { DCHECK(id != NULL); const char* hi_part = s.c_str(); - char* colon = const_cast(strchr(hi_part, ':')); + char* colon = const_cast(strchr(hi_part, '-')); if (colon == NULL) { return false; diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java index d438a33561dd2a..d82ca30a5fc4f0 100644 --- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -21,6 +21,8 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadManager; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Function; @@ -57,6 +59,7 @@ public class LoadStmt extends DdlStmt { public static final String LOAD_DELETE_FLAG_PROPERTY = "load_delete_flag"; public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; public static final String CLUSTER_PROPERTY = "cluster"; + private static final String VERSION = "version"; // for load data from Baidu Object Store(BOS) public static final String BOS_ENDPOINT = "bos_endpoint"; @@ -78,6 +81,8 @@ public class LoadStmt extends DdlStmt { private final Map properties; private String user; + private static String version = "v1"; + // properties set private final static ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(TIMEOUT_PROPERTY) @@ -85,6 +90,7 @@ public class LoadStmt extends DdlStmt { .add(LOAD_DELETE_FLAG_PROPERTY) .add(EXEC_MEM_LIMIT) .add(CLUSTER_PROPERTY) + .add(VERSION) .build(); public LoadStmt(LabelName label, List dataDescriptions, @@ -170,6 +176,17 @@ public static void checkProperties(Map properties) throws DdlExc throw new DdlException(MAX_FILTER_RATIO_PROPERTY + " is not a number."); } } + + // version + final String versionProperty = properties.get(VERSION); + // TODO(ml): only support v1 + if (versionProperty != null) { + if (!versionProperty.equalsIgnoreCase(LoadManager.VERSION)) { + throw new DdlException(VERSION + " must be " + LoadManager.VERSION); + } + version = LoadManager.VERSION; + } + } @Override @@ -203,6 +220,10 @@ public boolean needAuditEncryption() { return false; } + public String getVersion() { + return version; + } + @Override public String toSql() { StringBuilder sb = new StringBuilder(); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index b01c2b98cf965e..17de41452519ce 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1080,6 +1080,9 @@ private void transferToMaster() throws IOException { LoadChecker.init(Config.load_checker_interval_second * 1000L); LoadChecker.startAll(); + // New load scheduler + loadJobScheduler.start(); + // Export checker ExportChecker.init(Config.export_checker_interval_second * 1000L); ExportChecker.startAll(); diff --git a/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index 1082502a9cbc33..d07f1d9b0754eb 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -60,7 +60,7 @@ public ProcResult fetchResult() throws AnalysisException { result.setNames(TITLE_NAMES); LinkedList> loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), - null, false, null, null); + null, false, null); int counter = 0; Iterator> iterator = loadJobInfos.descendingIterator(); while (iterator.hasNext()) { diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 3368a616011713..9399c229c729ba 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -1399,7 +1399,7 @@ public List getCopiedAsyncDeleteJobs() { } public LinkedList> getLoadJobInfosByDb(long dbId, String dbName, String labelValue, - boolean accurateMatch, Set states, ArrayList orderByPairs) { + boolean accurateMatch, Set states) { LinkedList> loadJobInfos = new LinkedList>(); readLock(); try { @@ -1549,15 +1549,6 @@ public LinkedList> getLoadJobInfosByDb(long dbId, String dbName readUnlock(); } - ListComparator> comparator = null; - if (orderByPairs != null) { - OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()]; - comparator = new ListComparator>(orderByPairs.toArray(orderByPairArr)); - } else { - // sort by id asc - comparator = new ListComparator>(0); - } - Collections.sort(loadJobInfos, comparator); return loadJobInfos; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 87968345e54c69..24d103c9fda6f1 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -37,6 +37,8 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.load.PullLoadSourceInfo; +import com.google.common.base.Joiner; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,13 +59,11 @@ public class BrokerLoadJob extends LoadJob { // include broker desc and data desc private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo(); - // it will be set to true when pending task finished - private boolean isLoading = false; - public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc) { super(dbId, label); this.timeoutSecond = Config.pull_load_task_default_timeout_second; this.brokerDesc = brokerDesc; + this.jobType = org.apache.doris.load.LoadJob.EtlJobType.BROKER; } public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { @@ -116,8 +116,8 @@ public void onTaskFinished(TaskAttachment attachment) { } @Override - public void onTaskFailed(String errMsg) { - cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, errMsg); + public void onTaskFailed(FailMsg failMsg) { + cancelJobWithoutCheck(failMsg); } /** @@ -136,28 +136,30 @@ private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) { .build()); return; } - if (isLoading) { + if (finishedTaskIds.contains(attachment.getTaskId())) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("task_id", attachment.getTaskId()) .add("error_msg", "this is a duplicated callback of pending task " + "when broker already has loading task") .build()); return; } - isLoading = true; + + // add task id into finishedTaskIds + finishedTaskIds.add(attachment.getTaskId()); } finally { writeUnlock(); } - Database db = null; try { - db = getDb(); + Database db = getDb(); createLoadingTask(db, attachment); } catch (UserException e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) .add("database_id", dbId) .add("error_msg", "Failed to divide job into loading task.") .build(), e); - cancelJobWithoutCheck(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage())); return; } @@ -208,25 +210,51 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { return; } + // check if task has been finished + if (finishedTaskIds.contains(attachment.getTaskId())) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("task_id", attachment.getTaskId()) + .add("error_msg", "this is a duplicated callback of loading task").build()); + return; + } + // update loading status + finishedTaskIds.add(attachment.getTaskId()); updateLoadingStatus(attachment); // begin commit txn when all of loading tasks have been finished - if (!(tasks.size() == tasks.stream() - .filter(entity -> entity.isFinished()).count())) { + if (finishedTaskIds.size() != tasks.size()) { return; } // check data quality if (!checkDataQuality()) { - executeCancel(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG); + executeCancel(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG)); return; } } finally { writeUnlock(); } + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) + .add("commit_infos", Joiner.on(",").join(commitInfos)) + .build()); + } + + Database db = null; + try { + db = getDb(); + } catch (MetaNotFoundException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("database_id", dbId) + .add("error_msg", "db has been deleted when job is loading") + .build(), e); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage())); + } + db.writeLock(); try { + Catalog.getCurrentGlobalTransactionMgr().commitTransaction( dbId, transactionId, commitInfos); } catch (UserException e) { @@ -234,8 +262,10 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { .add("database_id", dbId) .add("error_msg", "Failed to commit txn.") .build(), e); - cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage())); return; + } finally { + db.writeUnlock(); } } @@ -248,15 +278,17 @@ private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) { loadingStatus.setTrackingUrl(attachment.getTrackingUrl()); } commitInfos.addAll(attachment.getCommitInfoList()); - int finishedTaskNum = (int) tasks.stream().filter(entity -> entity.isFinished()).count(); - progress = finishedTaskNum / tasks.size() * 100; + progress = finishedTaskIds.size() / tasks.size() * 100; if (progress == 100) { progress = 99; } } private String increaseCounter(String key, String deltaValue) { - long value = Long.valueOf(loadingStatus.getCounters().get(key)); + long value = 0; + if (loadingStatus.getCounters().containsKey(key)) { + value = Long.valueOf(loadingStatus.getCounters().get(key)); + } if (deltaValue != null) { value += Long.valueOf(deltaValue); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index e178cd34a8e843..3f2a6bd5b51e71 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -26,6 +26,7 @@ import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.FailMsg; import org.apache.doris.thrift.TBrokerFileStatus; import com.google.common.collect.Lists; @@ -47,9 +48,10 @@ public BrokerLoadPendingTask(LoadTaskCallback loadTaskCallback, Map> tableToBrokerFileList, BrokerDesc brokerDesc) { super(loadTaskCallback); - this.attachment = new BrokerPendingTaskAttachment(); + this.attachment = new BrokerPendingTaskAttachment(signature); this.tableToBrokerFileList = tableToBrokerFileList; this.brokerDesc = brokerDesc; + this.failMsg = new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, null); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java index 052d1e19955a01..1d3b042cf5a42d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java @@ -25,14 +25,15 @@ import java.util.List; import java.util.Map; -public class BrokerLoadingTaskAttachment implements TaskAttachment{ +public class BrokerLoadingTaskAttachment extends TaskAttachment { private Map counters; private String trackingUrl; private List commitInfoList; - public BrokerLoadingTaskAttachment(Map counters, String trackingUrl, + public BrokerLoadingTaskAttachment(long taskId, Map counters, String trackingUrl, List commitInfoList) { + super(taskId); this.trackingUrl = trackingUrl; this.counters = counters; this.commitInfoList = commitInfoList; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java index 55956fa67a1620..5d7c5db70a0167 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java @@ -27,13 +27,17 @@ import java.util.List; import java.util.Map; -public class BrokerPendingTaskAttachment implements TaskAttachment { +public class BrokerPendingTaskAttachment extends TaskAttachment { // table id -> file status private Map>> fileStatusMap = Maps.newHashMap(); // table id -> total file num private Map fileNumMap = Maps.newHashMap(); + public BrokerPendingTaskAttachment(long taskId) { + super(taskId); + } + public void addFileStatus(long tableId, List> fileStatusList) { fileStatusMap.put(tableId, fileStatusList); fileNumMap.put(tableId, fileStatusList.stream().mapToInt(entity -> entity.size()).sum()); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 0351798f6b6355..d22c8e8e3d123b 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -32,6 +32,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.load.Load; @@ -46,14 +47,18 @@ import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback { @@ -68,6 +73,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected long dbId; protected String label; protected JobState state = JobState.PENDING; + protected org.apache.doris.load.LoadJob.EtlJobType jobType; // optional properties // timeout second need to be reset in constructor of subclass @@ -83,6 +89,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected long transactionId; protected FailMsg failMsg; protected List tasks = Lists.newArrayList(); + protected List finishedTaskIds = Lists.newArrayList(); protected EtlStatus loadingStatus = new EtlStatus(); // 0: the job status is pending // n/100: n is the number of task which has been finished @@ -154,13 +161,8 @@ public long getFinishTimestamp() { return finishTimestamp; } - public boolean isFinished() { - readLock(); - try { - return state == JobState.FINISHED || state == JobState.CANCELLED; - } finally { - readUnlock(); - } + protected boolean isFinished() { + return state == JobState.FINISHED || state == JobState.CANCELLED; } protected void setJobProperties(Map properties) throws DdlException { @@ -255,7 +257,7 @@ public void processTimeout() { if (isFinished() || getDeadlineMs() >= System.currentTimeMillis() || isCommitting) { return; } - executeCancel(FailMsg.CancelType.TIMEOUT, "loading timeout to cancel"); + executeCancel(new FailMsg(FailMsg.CancelType.TIMEOUT, "loading timeout to cancel")); } finally { writeUnlock(); } @@ -290,16 +292,16 @@ private void executeLoad() { state = JobState.LOADING; } - public void cancelJobWithoutCheck(FailMsg.CancelType cancelType, String errMsg) { + public void cancelJobWithoutCheck(FailMsg failMsg) { writeLock(); try { - executeCancel(cancelType, errMsg); + executeCancel(failMsg); } finally { writeUnlock(); } } - public void cancelJob(FailMsg.CancelType cancelType, String errMsg) throws DdlException { + public void cancelJob(FailMsg failMsg) throws DdlException { writeLock(); try { if (isCommitting) { @@ -308,15 +310,15 @@ public void cancelJob(FailMsg.CancelType cancelType, String errMsg) throws DdlEx + "The job could not be cancelled in this step").build()); throw new DdlException("Job could not be cancelled while txn is committing"); } - executeCancel(cancelType, errMsg); + executeCancel(failMsg); } finally { writeUnlock(); } } - protected void executeCancel(FailMsg.CancelType cancelType, String errMsg) { + protected void executeCancel(FailMsg failMsg) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) - .add("error_msg", "Failed to execute load with error " + errMsg) + .add("error_msg", "Failed to execute load with error " + failMsg.getMsg()) .build()); // reset txn id @@ -333,7 +335,7 @@ protected void executeCancel(FailMsg.CancelType cancelType, String errMsg) { tasks.clear(); // set failMsg and state - failMsg = new FailMsg(cancelType, errMsg); + this.failMsg = failMsg; finishTimestamp = System.currentTimeMillis(); state = JobState.CANCELLED; @@ -365,6 +367,69 @@ protected boolean checkDataQuality() { return true; } + public List getShowInfo() { + readLock(); + try { + List jobInfo = Lists.newArrayList(); + // jobId + jobInfo.add(id); + // label + jobInfo.add(label); + // state + jobInfo.add(state.name()); + + // progress + switch (state) { + case PENDING: + jobInfo.add("ETL:N/A; LOAD:0%"); + break; + case CANCELLED: + jobInfo.add("ETL:N/A; LOAD:N/A"); + break; + default: + jobInfo.add("ETL:N/A; LOAD:" + progress + "%"); + break; + } + + // type + jobInfo.add(jobType); + + // etl info + if (loadingStatus.getCounters().size() == 0) { + jobInfo.add("N/A"); + } else { + jobInfo.add(Joiner.on("; ").withKeyValueSeparator("=").join(loadingStatus.getCounters())); + } + + // task info + jobInfo.add("cluster:N/A" + "; timeout(s):" + timeoutSecond + + "; max_filter_ratio:" + maxFilterRatio); + + // error msg + if (failMsg == null) { + jobInfo.add("N/A"); + } else { + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } + + // create time + jobInfo.add(TimeUtils.longToTimeString(createTimestamp)); + // etl start time + jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); + // etl end time + jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); + // load start time + jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); + // load end time + jobInfo.add(TimeUtils.longToTimeString(finishTimestamp)); + // tracking url + jobInfo.add(loadingStatus.getTrackingUrl()); + return jobInfo; + } finally { + readUnlock(); + } + } + @Override public long getCallbackId() { return id; @@ -418,7 +483,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String return; } // cancel load job - executeCancel(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason); + executeCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason)); } finally { writeUnlock(); } @@ -426,7 +491,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String @Override public void replayOnAborted(TransactionState txnState) { - cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, null); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, null)); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java index 9d72b8aa66bc08..74b2f4f54abe76 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java @@ -74,7 +74,7 @@ private void process() throws InterruptedException { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) .add("error_msg", "There are error properties in job. Job will be cancelled") .build(), e); - loadJob.cancelJobWithoutCheck(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()); + loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage())); continue; } catch (BeginTransactionException e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 0b91a72fe916b8..9556db80a0d903 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -23,11 +23,14 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.LoadException; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.Load; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TBrokerFileStatus; @@ -67,6 +70,7 @@ public LoadLoadingTask(Database db, OlapTable table, this.jobDeadlineMs = jobDeadlineMs; this.execMemLimit = execMemLimit; this.txnId = txnId; + this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, null); } public void init(List> fileStatusList, int fileNum) throws UserException { @@ -78,7 +82,7 @@ public void init(List> fileStatusList, int fileNum) thro protected void executeTask() throws UserException { int retryTime = 3; for (int i = 0; i < retryTime; ++i) { - isFinished = executeOnce(); + boolean isFinished = executeOnce(); if (isFinished) { return; } @@ -90,7 +94,7 @@ private boolean executeOnce() { // New one query id, UUID uuid = UUID.randomUUID(); TUniqueId executeId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - Coordinator curCoordinator = new Coordinator(-1L, executeId, planner.getDescTable(), + Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), executeId, planner.getDescTable(), planner.getFragments(), planner.getScanNodes(), db.getClusterName()); curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); @@ -99,10 +103,11 @@ private boolean executeOnce() { try { QeProcessorImpl.INSTANCE .registerQuery(executeId, curCoordinator); - return actualExecute(curCoordinator); - } catch (UserException e) { + actualExecute(curCoordinator); + return true; + } catch (Exception e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) - .add("error_msg", "failed to execute loading task") + .add("error_msg", "coordinator execute failed in loading task") .build(), e); errMsg = e.getMessage(); return false; @@ -111,36 +116,25 @@ private boolean executeOnce() { } } - private boolean actualExecute(Coordinator curCoordinator) { + private void actualExecute(Coordinator curCoordinator) throws Exception { int waitSecond = (int) (getLeftTimeMs() / 1000); if (waitSecond <= 0) { - errMsg = "time out"; - return false; + throw new LoadException("failed to execute plan when the left time is less then 0"); } - try { - curCoordinator.exec(); - } catch (Exception e) { - LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) - .add("error_msg", "coordinator execute failed") - .build(), e); - errMsg = "coordinator execute failed with error " + e.getMessage(); - return false; - } + curCoordinator.exec(); if (curCoordinator.join(waitSecond)) { Status status = curCoordinator.getExecStatus(); if (status.ok()) { - attachment = new BrokerLoadingTaskAttachment(curCoordinator.getLoadCounters(), + attachment = new BrokerLoadingTaskAttachment(signature, + curCoordinator.getLoadCounters(), curCoordinator.getTrackingUrl(), TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos())); - return true; } else { - errMsg = status.getErrorMsg(); - return false; + throw new LoadException(status.getErrorMsg()); } } else { - errMsg = "coordinator could not finished before job timeout"; - return false; + throw new LoadException("coordinator could not finished before job timeout"); } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index d421cf564dc195..edd1772369ad8c 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -26,15 +26,23 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.util.LogBuilder; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.transaction.TransactionState; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.ArrayList; +import java.util.EnumSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -43,6 +51,9 @@ * The broker and mini load jobs(v2) are included in this class. */ public class LoadManager { + private static final Logger LOG = LogManager.getLogger(LoadManager.class); + public static final String VERSION = "v2"; + private Map idToLoadJob = Maps.newConcurrentMap(); private Map>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap(); private LoadJobScheduler loadJobScheduler; @@ -118,10 +129,57 @@ public void processTimeoutJobs() { idToLoadJob.values().stream().forEach(entity -> entity.processTimeout()); } + public List> getLoadJobInfosByDb(long dbId, String labelValue, + boolean accurateMatch, List statesValue) { + LinkedList> loadJobInfos = new LinkedList>(); + if (!dbIdToLabelToLoadJobs.containsKey(dbId)) { + return loadJobInfos; + } + + List states = Lists.newArrayList(); + if (statesValue == null || statesValue.size() == 0) { + states.addAll(EnumSet.allOf(JobState.class)); + } else { + for (String stateValue : statesValue) { + try { + states.add(JobState.valueOf(stateValue)); + } catch (IllegalArgumentException e) { + // ignore this state + } + } + } + + readLock(); + try { + Map> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId); + if (accurateMatch) { + if (!labelToLoadJobs.containsKey(labelValue)) { + return loadJobInfos; + } + loadJobInfos.addAll(labelToLoadJobs.get(labelValue).stream() + .filter(entity -> states.contains(entity.getState())) + .map(entity -> entity.getShowInfo()).collect(Collectors.toList())); + return loadJobInfos; + } + + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(labelValue)) { + loadJobInfos.addAll(entry.getValue().stream() + .filter(entity -> states.contains(entity.getState())) + .map(entity -> entity.getShowInfo()).collect(Collectors.toList())); + } + } + return loadJobInfos; + } finally { + readUnlock(); + } + } + private Database checkDb(String dbName) throws DdlException { // get db Database db = Catalog.getInstance().getDb(dbName); if (db == null) { + LOG.warn("Database {} does not exist", dbName); throw new DdlException("Database[" + dbName + "] does not exist"); } return db; @@ -145,6 +203,7 @@ private void isLabelUsed(long dbId, String label) if (labelToLoadJobs.containsKey(label)) { List labelLoadJobs = labelToLoadJobs.get(label); if (labelLoadJobs.stream().filter(entity -> !entity.isFinished()).count() != 0) { + LOG.warn("Failed to add load job when label {} has been used.", label); throw new LabelAlreadyUsedException(label); } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index 156745430738af..3a95006aabd25f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -20,13 +20,15 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.catalog.Catalog; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.FailMsg; import org.apache.doris.task.MasterTask; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public abstract class LoadTask extends MasterTask { @@ -34,37 +36,34 @@ public abstract class LoadTask extends MasterTask { protected LoadTaskCallback callback; protected TaskAttachment attachment; - protected boolean isFinished = false; + protected FailMsg failMsg = new FailMsg(); - public LoadTask(LoadTaskCallback callback) { + public LoadTask(LoadTaskCallback callback){ + this.signature = Catalog.getCurrentCatalog().getNextId(); this.callback = callback; } @Override protected void exec() { - Exception exception = null; + boolean isFinished = false; try { // execute pending task executeTask(); - isFinished = true; // callback on pending task finished callback.onTaskFinished(attachment); + isFinished = true; } catch (Exception e) { - exception = e; + failMsg.setMsg(e.getMessage()); LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) .add("error_msg", "Failed to execute load task").build(), e); } finally { if (!isFinished) { // callback on pending task failed - callback.onTaskFailed(exception == null ? "unknown error" : exception.getMessage()); + callback.onTaskFailed(failMsg); } } } - public boolean isFinished() { - return isFinished; - } - /** * execute load task * diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java index 838867dd4e1dcb..de9cfd41b3906d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java @@ -20,10 +20,12 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.load.FailMsg; + public interface LoadTaskCallback { long getCallbackId(); void onTaskFinished(TaskAttachment attachment); - void onTaskFailed(String errMsg); + void onTaskFailed(FailMsg failMsg); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 6bf852a388ed36..9abea3bcd44f8d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -44,6 +44,7 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Joiner; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; @@ -104,7 +105,9 @@ public void plan(List> fileStatusesList, int filesAdded) fileStatusesList, filesAdded); scanNode.setLoadInfo(table, brokerDesc, fileGroups); scanNode.init(analyzer); + scanNode.finalize(analyzer); scanNodes.add(scanNode); + descTable.computeMemLayout(); // 2. Olap table sink String partitionNames = convertBrokerDescPartitionInfo(); @@ -147,9 +150,15 @@ public List getScanNodes() { private String convertBrokerDescPartitionInfo() { String result = ""; for (BrokerFileGroup brokerFileGroup : fileGroups) { + if (brokerFileGroup.getPartitionNames() == null) { + continue; + } result += Joiner.on(",").join(brokerFileGroup.getPartitionNames()); result += ","; } + if (Strings.isNullOrEmpty(result)) { + return null; + } result = result.substring(0, result.length() - 2); return result; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java index 14363fb72ee136..422843cb7fd40a 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java @@ -20,5 +20,14 @@ package org.apache.doris.load.loadv2; -public interface TaskAttachment { +public class TaskAttachment { + private long taskId; + + public TaskAttachment(long taskId) { + this.taskId = taskId; + } + + public long getTaskId() { + return taskId; + } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 518f8b1a1f0437..c0073df93961ad 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -50,7 +50,6 @@ import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; -import org.apache.doris.transaction.TxnStateChangeCallback; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index dfc90a54e68152..e85ce1a7c713bc 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -67,6 +67,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.load.LoadJob.EtlJobType; +import org.apache.doris.load.loadv2.LoadManager; /** * Created by zhaochun on 14/11/10. @@ -112,7 +113,12 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt, String origStmt) th } jobType = EtlJobType.HADOOP; } - catalog.getLoadInstance().addLoadJob(loadStmt, jobType, System.currentTimeMillis()); + // TODO(ml): WIP + if (loadStmt.getVersion().equals(LoadManager.VERSION)) { + catalog.getLoadManager().createLoadJobFromStmt(loadStmt); + } else { + catalog.getLoadInstance().addLoadJob(loadStmt, jobType, System.currentTimeMillis()); + } } else if (ddlStmt instanceof CancelLoadStmt) { catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt); } else if (ddlStmt instanceof CreateRoutineLoadStmt) { diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 532d8847120663..9d47f13cacb334 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -90,8 +90,10 @@ import org.apache.doris.common.proc.PartitionsProcDir; import org.apache.doris.common.proc.ProcNodeInterface; import org.apache.doris.common.proc.TabletsProcDir; +import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.OrderByPair; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; @@ -118,10 +120,12 @@ import java.net.URL; import java.net.URLConnection; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; // Execute one show statement. public class ShowExecutor { @@ -648,8 +652,24 @@ private void handleShowLoad() throws AnalysisException { List> loadInfos = load.getLoadJobInfosByDb(dbId, db.getFullName(), showStmt.getLabelValue(), showStmt.isAccurateMatch(), - showStmt.getStates(), - showStmt.getOrderByPairs()); + showStmt.getStates()); + List statesValue = showStmt.getStates() == null ? null : showStmt.getStates().stream() + .map(entity -> entity.name()) + .collect(Collectors.toList()); + loadInfos.addAll(catalog.getLoadManager().getLoadJobInfosByDb(dbId, showStmt.getLabelValue(), + showStmt.isAccurateMatch(), + statesValue)); + List orderByPairs = showStmt.getOrderByPairs(); + ListComparator> comparator = null; + if (orderByPairs != null) { + OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()]; + comparator = new ListComparator>(orderByPairs.toArray(orderByPairArr)); + } else { + // sort by id asc + comparator = new ListComparator>(0); + } + Collections.sort(loadInfos, comparator); + List> rows = Lists.newArrayList(); for (List loadInfo : loadInfos) { List oneInfo = new ArrayList(loadInfo.size()); diff --git a/fe/src/main/java/org/apache/doris/transaction/TabletCommitInfo.java b/fe/src/main/java/org/apache/doris/transaction/TabletCommitInfo.java index 91f6f43d43b854..06c4e908f2d82d 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TabletCommitInfo.java +++ b/fe/src/main/java/org/apache/doris/transaction/TabletCommitInfo.java @@ -21,6 +21,7 @@ import org.apache.doris.thrift.TTabletCommitInfo; import com.google.common.collect.Lists; +import com.google.gson.Gson; import java.io.DataInput; import java.io.DataOutput; @@ -65,4 +66,10 @@ public void readFields(DataInput in) throws IOException { tabletId = in.readLong(); backendId = in.readLong(); } + + @Override + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } } From c4f3663d6c01f5fb2455e06ad1826b8af8ff9369 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Thu, 9 May 2019 10:08:10 +0800 Subject: [PATCH 2/2] Add some annotation --- .../org/apache/doris/load/loadv2/LoadJob.java | 4 +--- .../apache/doris/load/loadv2/LoadManager.java | 17 ++++++++++++++--- .../java/org/apache/doris/qe/ShowExecutor.java | 7 +++++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index d22c8e8e3d123b..ebc9fdacca8e66 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -52,7 +52,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -89,7 +88,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected long transactionId; protected FailMsg failMsg; protected List tasks = Lists.newArrayList(); - protected List finishedTaskIds = Lists.newArrayList(); + protected Set finishedTaskIds = Sets.newHashSet(); protected EtlStatus loadingStatus = new EtlStatus(); // 0: the job status is pending // n/100: n is the number of task which has been finished @@ -327,7 +326,6 @@ protected void executeCancel(FailMsg failMsg) { } // clean the loadingStatus - loadingStatus.reset(); loadingStatus.setState(TEtlState.CANCELLED); // tasks will not be removed from task pool. diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index edd1772369ad8c..28375289e5917f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -32,6 +32,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -129,14 +130,24 @@ public void processTimeoutJobs() { idToLoadJob.values().stream().forEach(entity -> entity.processTimeout()); } + /** + * This method will return the jobs info which can meet the condition of input param. + * @param dbId used to filter jobs which belong to this db + * @param labelValue used to filter jobs which's label is or like labelValue. + * @param accurateMatch true: filter jobs which's label is labelValue. false: filter jobs which's label like itself. + * @param statesValue used to filter jobs which's state within the statesValue set. + * @return The result is the list of jobInfo. + * JobInfo is a List which includes the comparable object: jobId, label, state etc. + * The result is unordered. + */ public List> getLoadJobInfosByDb(long dbId, String labelValue, - boolean accurateMatch, List statesValue) { + boolean accurateMatch, Set statesValue) { LinkedList> loadJobInfos = new LinkedList>(); if (!dbIdToLabelToLoadJobs.containsKey(dbId)) { return loadJobInfos; } - List states = Lists.newArrayList(); + Set states = Sets.newHashSet(); if (statesValue == null || statesValue.size() == 0) { states.addAll(EnumSet.allOf(JobState.class)); } else { @@ -161,7 +172,7 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, .map(entity -> entity.getShowInfo()).collect(Collectors.toList())); return loadJobInfos; } - + List loadJobList = Lists.newArrayList(); for (Map.Entry> entry : labelToLoadJobs.entrySet()) { if (entry.getKey().contains(labelValue)) { loadJobInfos.addAll(entry.getValue().stream() diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 9d47f13cacb334..b18331be993f3c 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -648,17 +648,20 @@ private void handleShowLoad() throws AnalysisException { } long dbId = db.getId(); + // combine the List of load(v1) and loadManager(v2) Load load = catalog.getLoadInstance(); List> loadInfos = load.getLoadJobInfosByDb(dbId, db.getFullName(), showStmt.getLabelValue(), showStmt.isAccurateMatch(), showStmt.getStates()); - List statesValue = showStmt.getStates() == null ? null : showStmt.getStates().stream() + Set statesValue = showStmt.getStates() == null ? null : showStmt.getStates().stream() .map(entity -> entity.name()) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); loadInfos.addAll(catalog.getLoadManager().getLoadJobInfosByDb(dbId, showStmt.getLabelValue(), showStmt.isAccurateMatch(), statesValue)); + + // order the result of List by orderByPairs in show stmt List orderByPairs = showStmt.getOrderByPairs(); ListComparator> comparator = null; if (orderByPairs != null) {