diff --git a/be/src/util/uid_util.cpp b/be/src/util/uid_util.cpp index dcbef4ca5165a8..73d31db0624fdb 100644 --- a/be/src/util/uid_util.cpp +++ b/be/src/util/uid_util.cpp @@ -26,13 +26,13 @@ std::ostream& operator<<(std::ostream& os, const UniqueId& uid) { std::string print_id(const TUniqueId& id) { std::stringstream out; - out << std::hex << id.hi << ":" << id.lo; + out << std::hex << id.hi << "-" << id.lo; return out.str(); } std::string print_id(const PUniqueId& id) { std::stringstream out; - out << std::hex << id.hi() << ":" << id.lo(); + out << std::hex << id.hi() << "-" << id.lo(); return out.str(); } @@ -40,7 +40,7 @@ bool parse_id(const std::string& s, TUniqueId* id) { DCHECK(id != NULL); const char* hi_part = s.c_str(); - char* colon = const_cast(strchr(hi_part, ':')); + char* colon = const_cast(strchr(hi_part, '-')); if (colon == NULL) { return false; diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java index d438a33561dd2a..d82ca30a5fc4f0 100644 --- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -21,6 +21,8 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadManager; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Function; @@ -57,6 +59,7 @@ public class LoadStmt extends DdlStmt { public static final String LOAD_DELETE_FLAG_PROPERTY = "load_delete_flag"; public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; public static final String CLUSTER_PROPERTY = "cluster"; + private static final String VERSION = "version"; // for load data from Baidu Object Store(BOS) public static final String BOS_ENDPOINT = "bos_endpoint"; @@ -78,6 +81,8 @@ public class LoadStmt extends DdlStmt { private final Map properties; private String user; + private static String version = "v1"; + // properties set private final static ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(TIMEOUT_PROPERTY) @@ -85,6 +90,7 @@ public class LoadStmt extends DdlStmt { .add(LOAD_DELETE_FLAG_PROPERTY) .add(EXEC_MEM_LIMIT) .add(CLUSTER_PROPERTY) + .add(VERSION) .build(); public LoadStmt(LabelName label, List dataDescriptions, @@ -170,6 +176,17 @@ public static void checkProperties(Map properties) throws DdlExc throw new DdlException(MAX_FILTER_RATIO_PROPERTY + " is not a number."); } } + + // version + final String versionProperty = properties.get(VERSION); + // TODO(ml): only support v1 + if (versionProperty != null) { + if (!versionProperty.equalsIgnoreCase(LoadManager.VERSION)) { + throw new DdlException(VERSION + " must be " + LoadManager.VERSION); + } + version = LoadManager.VERSION; + } + } @Override @@ -203,6 +220,10 @@ public boolean needAuditEncryption() { return false; } + public String getVersion() { + return version; + } + @Override public String toSql() { StringBuilder sb = new StringBuilder(); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index b01c2b98cf965e..17de41452519ce 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1080,6 +1080,9 @@ private void transferToMaster() throws IOException { LoadChecker.init(Config.load_checker_interval_second * 1000L); LoadChecker.startAll(); + // New load scheduler + loadJobScheduler.start(); + // Export checker ExportChecker.init(Config.export_checker_interval_second * 1000L); ExportChecker.startAll(); diff --git a/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index 1082502a9cbc33..d07f1d9b0754eb 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -60,7 +60,7 @@ public ProcResult fetchResult() throws AnalysisException { result.setNames(TITLE_NAMES); LinkedList> loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), - null, false, null, null); + null, false, null); int counter = 0; Iterator> iterator = loadJobInfos.descendingIterator(); while (iterator.hasNext()) { diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 3368a616011713..9399c229c729ba 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -1399,7 +1399,7 @@ public List getCopiedAsyncDeleteJobs() { } public LinkedList> getLoadJobInfosByDb(long dbId, String dbName, String labelValue, - boolean accurateMatch, Set states, ArrayList orderByPairs) { + boolean accurateMatch, Set states) { LinkedList> loadJobInfos = new LinkedList>(); readLock(); try { @@ -1549,15 +1549,6 @@ public LinkedList> getLoadJobInfosByDb(long dbId, String dbName readUnlock(); } - ListComparator> comparator = null; - if (orderByPairs != null) { - OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()]; - comparator = new ListComparator>(orderByPairs.toArray(orderByPairArr)); - } else { - // sort by id asc - comparator = new ListComparator>(0); - } - Collections.sort(loadJobInfos, comparator); return loadJobInfos; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 87968345e54c69..24d103c9fda6f1 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -37,6 +37,8 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.load.PullLoadSourceInfo; +import com.google.common.base.Joiner; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,13 +59,11 @@ public class BrokerLoadJob extends LoadJob { // include broker desc and data desc private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo(); - // it will be set to true when pending task finished - private boolean isLoading = false; - public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc) { super(dbId, label); this.timeoutSecond = Config.pull_load_task_default_timeout_second; this.brokerDesc = brokerDesc; + this.jobType = org.apache.doris.load.LoadJob.EtlJobType.BROKER; } public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { @@ -116,8 +116,8 @@ public void onTaskFinished(TaskAttachment attachment) { } @Override - public void onTaskFailed(String errMsg) { - cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, errMsg); + public void onTaskFailed(FailMsg failMsg) { + cancelJobWithoutCheck(failMsg); } /** @@ -136,28 +136,30 @@ private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) { .build()); return; } - if (isLoading) { + if (finishedTaskIds.contains(attachment.getTaskId())) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("task_id", attachment.getTaskId()) .add("error_msg", "this is a duplicated callback of pending task " + "when broker already has loading task") .build()); return; } - isLoading = true; + + // add task id into finishedTaskIds + finishedTaskIds.add(attachment.getTaskId()); } finally { writeUnlock(); } - Database db = null; try { - db = getDb(); + Database db = getDb(); createLoadingTask(db, attachment); } catch (UserException e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) .add("database_id", dbId) .add("error_msg", "Failed to divide job into loading task.") .build(), e); - cancelJobWithoutCheck(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage())); return; } @@ -208,25 +210,51 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { return; } + // check if task has been finished + if (finishedTaskIds.contains(attachment.getTaskId())) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("task_id", attachment.getTaskId()) + .add("error_msg", "this is a duplicated callback of loading task").build()); + return; + } + // update loading status + finishedTaskIds.add(attachment.getTaskId()); updateLoadingStatus(attachment); // begin commit txn when all of loading tasks have been finished - if (!(tasks.size() == tasks.stream() - .filter(entity -> entity.isFinished()).count())) { + if (finishedTaskIds.size() != tasks.size()) { return; } // check data quality if (!checkDataQuality()) { - executeCancel(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG); + executeCancel(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG)); return; } } finally { writeUnlock(); } + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) + .add("commit_infos", Joiner.on(",").join(commitInfos)) + .build()); + } + + Database db = null; + try { + db = getDb(); + } catch (MetaNotFoundException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("database_id", dbId) + .add("error_msg", "db has been deleted when job is loading") + .build(), e); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage())); + } + db.writeLock(); try { + Catalog.getCurrentGlobalTransactionMgr().commitTransaction( dbId, transactionId, commitInfos); } catch (UserException e) { @@ -234,8 +262,10 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { .add("database_id", dbId) .add("error_msg", "Failed to commit txn.") .build(), e); - cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage())); return; + } finally { + db.writeUnlock(); } } @@ -248,15 +278,17 @@ private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) { loadingStatus.setTrackingUrl(attachment.getTrackingUrl()); } commitInfos.addAll(attachment.getCommitInfoList()); - int finishedTaskNum = (int) tasks.stream().filter(entity -> entity.isFinished()).count(); - progress = finishedTaskNum / tasks.size() * 100; + progress = finishedTaskIds.size() / tasks.size() * 100; if (progress == 100) { progress = 99; } } private String increaseCounter(String key, String deltaValue) { - long value = Long.valueOf(loadingStatus.getCounters().get(key)); + long value = 0; + if (loadingStatus.getCounters().containsKey(key)) { + value = Long.valueOf(loadingStatus.getCounters().get(key)); + } if (deltaValue != null) { value += Long.valueOf(deltaValue); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index e178cd34a8e843..3f2a6bd5b51e71 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -26,6 +26,7 @@ import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.FailMsg; import org.apache.doris.thrift.TBrokerFileStatus; import com.google.common.collect.Lists; @@ -47,9 +48,10 @@ public BrokerLoadPendingTask(LoadTaskCallback loadTaskCallback, Map> tableToBrokerFileList, BrokerDesc brokerDesc) { super(loadTaskCallback); - this.attachment = new BrokerPendingTaskAttachment(); + this.attachment = new BrokerPendingTaskAttachment(signature); this.tableToBrokerFileList = tableToBrokerFileList; this.brokerDesc = brokerDesc; + this.failMsg = new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, null); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java index 052d1e19955a01..1d3b042cf5a42d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java @@ -25,14 +25,15 @@ import java.util.List; import java.util.Map; -public class BrokerLoadingTaskAttachment implements TaskAttachment{ +public class BrokerLoadingTaskAttachment extends TaskAttachment { private Map counters; private String trackingUrl; private List commitInfoList; - public BrokerLoadingTaskAttachment(Map counters, String trackingUrl, + public BrokerLoadingTaskAttachment(long taskId, Map counters, String trackingUrl, List commitInfoList) { + super(taskId); this.trackingUrl = trackingUrl; this.counters = counters; this.commitInfoList = commitInfoList; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java index 55956fa67a1620..5d7c5db70a0167 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java @@ -27,13 +27,17 @@ import java.util.List; import java.util.Map; -public class BrokerPendingTaskAttachment implements TaskAttachment { +public class BrokerPendingTaskAttachment extends TaskAttachment { // table id -> file status private Map>> fileStatusMap = Maps.newHashMap(); // table id -> total file num private Map fileNumMap = Maps.newHashMap(); + public BrokerPendingTaskAttachment(long taskId) { + super(taskId); + } + public void addFileStatus(long tableId, List> fileStatusList) { fileStatusMap.put(tableId, fileStatusList); fileNumMap.put(tableId, fileStatusList.stream().mapToInt(entity -> entity.size()).sum()); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 0351798f6b6355..ebc9fdacca8e66 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -32,6 +32,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.load.Load; @@ -46,14 +47,17 @@ import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback { @@ -68,6 +72,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected long dbId; protected String label; protected JobState state = JobState.PENDING; + protected org.apache.doris.load.LoadJob.EtlJobType jobType; // optional properties // timeout second need to be reset in constructor of subclass @@ -83,6 +88,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected long transactionId; protected FailMsg failMsg; protected List tasks = Lists.newArrayList(); + protected Set finishedTaskIds = Sets.newHashSet(); protected EtlStatus loadingStatus = new EtlStatus(); // 0: the job status is pending // n/100: n is the number of task which has been finished @@ -154,13 +160,8 @@ public long getFinishTimestamp() { return finishTimestamp; } - public boolean isFinished() { - readLock(); - try { - return state == JobState.FINISHED || state == JobState.CANCELLED; - } finally { - readUnlock(); - } + protected boolean isFinished() { + return state == JobState.FINISHED || state == JobState.CANCELLED; } protected void setJobProperties(Map properties) throws DdlException { @@ -255,7 +256,7 @@ public void processTimeout() { if (isFinished() || getDeadlineMs() >= System.currentTimeMillis() || isCommitting) { return; } - executeCancel(FailMsg.CancelType.TIMEOUT, "loading timeout to cancel"); + executeCancel(new FailMsg(FailMsg.CancelType.TIMEOUT, "loading timeout to cancel")); } finally { writeUnlock(); } @@ -290,16 +291,16 @@ private void executeLoad() { state = JobState.LOADING; } - public void cancelJobWithoutCheck(FailMsg.CancelType cancelType, String errMsg) { + public void cancelJobWithoutCheck(FailMsg failMsg) { writeLock(); try { - executeCancel(cancelType, errMsg); + executeCancel(failMsg); } finally { writeUnlock(); } } - public void cancelJob(FailMsg.CancelType cancelType, String errMsg) throws DdlException { + public void cancelJob(FailMsg failMsg) throws DdlException { writeLock(); try { if (isCommitting) { @@ -308,15 +309,15 @@ public void cancelJob(FailMsg.CancelType cancelType, String errMsg) throws DdlEx + "The job could not be cancelled in this step").build()); throw new DdlException("Job could not be cancelled while txn is committing"); } - executeCancel(cancelType, errMsg); + executeCancel(failMsg); } finally { writeUnlock(); } } - protected void executeCancel(FailMsg.CancelType cancelType, String errMsg) { + protected void executeCancel(FailMsg failMsg) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) - .add("error_msg", "Failed to execute load with error " + errMsg) + .add("error_msg", "Failed to execute load with error " + failMsg.getMsg()) .build()); // reset txn id @@ -325,7 +326,6 @@ protected void executeCancel(FailMsg.CancelType cancelType, String errMsg) { } // clean the loadingStatus - loadingStatus.reset(); loadingStatus.setState(TEtlState.CANCELLED); // tasks will not be removed from task pool. @@ -333,7 +333,7 @@ protected void executeCancel(FailMsg.CancelType cancelType, String errMsg) { tasks.clear(); // set failMsg and state - failMsg = new FailMsg(cancelType, errMsg); + this.failMsg = failMsg; finishTimestamp = System.currentTimeMillis(); state = JobState.CANCELLED; @@ -365,6 +365,69 @@ protected boolean checkDataQuality() { return true; } + public List getShowInfo() { + readLock(); + try { + List jobInfo = Lists.newArrayList(); + // jobId + jobInfo.add(id); + // label + jobInfo.add(label); + // state + jobInfo.add(state.name()); + + // progress + switch (state) { + case PENDING: + jobInfo.add("ETL:N/A; LOAD:0%"); + break; + case CANCELLED: + jobInfo.add("ETL:N/A; LOAD:N/A"); + break; + default: + jobInfo.add("ETL:N/A; LOAD:" + progress + "%"); + break; + } + + // type + jobInfo.add(jobType); + + // etl info + if (loadingStatus.getCounters().size() == 0) { + jobInfo.add("N/A"); + } else { + jobInfo.add(Joiner.on("; ").withKeyValueSeparator("=").join(loadingStatus.getCounters())); + } + + // task info + jobInfo.add("cluster:N/A" + "; timeout(s):" + timeoutSecond + + "; max_filter_ratio:" + maxFilterRatio); + + // error msg + if (failMsg == null) { + jobInfo.add("N/A"); + } else { + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } + + // create time + jobInfo.add(TimeUtils.longToTimeString(createTimestamp)); + // etl start time + jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); + // etl end time + jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); + // load start time + jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); + // load end time + jobInfo.add(TimeUtils.longToTimeString(finishTimestamp)); + // tracking url + jobInfo.add(loadingStatus.getTrackingUrl()); + return jobInfo; + } finally { + readUnlock(); + } + } + @Override public long getCallbackId() { return id; @@ -418,7 +481,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String return; } // cancel load job - executeCancel(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason); + executeCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason)); } finally { writeUnlock(); } @@ -426,7 +489,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String @Override public void replayOnAborted(TransactionState txnState) { - cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, null); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, null)); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java index 9d72b8aa66bc08..74b2f4f54abe76 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java @@ -74,7 +74,7 @@ private void process() throws InterruptedException { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) .add("error_msg", "There are error properties in job. Job will be cancelled") .build(), e); - loadJob.cancelJobWithoutCheck(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()); + loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage())); continue; } catch (BeginTransactionException e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 0b91a72fe916b8..9556db80a0d903 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -23,11 +23,14 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.LoadException; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.Load; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TBrokerFileStatus; @@ -67,6 +70,7 @@ public LoadLoadingTask(Database db, OlapTable table, this.jobDeadlineMs = jobDeadlineMs; this.execMemLimit = execMemLimit; this.txnId = txnId; + this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, null); } public void init(List> fileStatusList, int fileNum) throws UserException { @@ -78,7 +82,7 @@ public void init(List> fileStatusList, int fileNum) thro protected void executeTask() throws UserException { int retryTime = 3; for (int i = 0; i < retryTime; ++i) { - isFinished = executeOnce(); + boolean isFinished = executeOnce(); if (isFinished) { return; } @@ -90,7 +94,7 @@ private boolean executeOnce() { // New one query id, UUID uuid = UUID.randomUUID(); TUniqueId executeId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - Coordinator curCoordinator = new Coordinator(-1L, executeId, planner.getDescTable(), + Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), executeId, planner.getDescTable(), planner.getFragments(), planner.getScanNodes(), db.getClusterName()); curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); @@ -99,10 +103,11 @@ private boolean executeOnce() { try { QeProcessorImpl.INSTANCE .registerQuery(executeId, curCoordinator); - return actualExecute(curCoordinator); - } catch (UserException e) { + actualExecute(curCoordinator); + return true; + } catch (Exception e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) - .add("error_msg", "failed to execute loading task") + .add("error_msg", "coordinator execute failed in loading task") .build(), e); errMsg = e.getMessage(); return false; @@ -111,36 +116,25 @@ private boolean executeOnce() { } } - private boolean actualExecute(Coordinator curCoordinator) { + private void actualExecute(Coordinator curCoordinator) throws Exception { int waitSecond = (int) (getLeftTimeMs() / 1000); if (waitSecond <= 0) { - errMsg = "time out"; - return false; + throw new LoadException("failed to execute plan when the left time is less then 0"); } - try { - curCoordinator.exec(); - } catch (Exception e) { - LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) - .add("error_msg", "coordinator execute failed") - .build(), e); - errMsg = "coordinator execute failed with error " + e.getMessage(); - return false; - } + curCoordinator.exec(); if (curCoordinator.join(waitSecond)) { Status status = curCoordinator.getExecStatus(); if (status.ok()) { - attachment = new BrokerLoadingTaskAttachment(curCoordinator.getLoadCounters(), + attachment = new BrokerLoadingTaskAttachment(signature, + curCoordinator.getLoadCounters(), curCoordinator.getTrackingUrl(), TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos())); - return true; } else { - errMsg = status.getErrorMsg(); - return false; + throw new LoadException(status.getErrorMsg()); } } else { - errMsg = "coordinator could not finished before job timeout"; - return false; + throw new LoadException("coordinator could not finished before job timeout"); } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index d421cf564dc195..28375289e5917f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -26,15 +26,24 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.util.LogBuilder; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.transaction.TransactionState; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.EnumSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -43,6 +52,9 @@ * The broker and mini load jobs(v2) are included in this class. */ public class LoadManager { + private static final Logger LOG = LogManager.getLogger(LoadManager.class); + public static final String VERSION = "v2"; + private Map idToLoadJob = Maps.newConcurrentMap(); private Map>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap(); private LoadJobScheduler loadJobScheduler; @@ -118,10 +130,67 @@ public void processTimeoutJobs() { idToLoadJob.values().stream().forEach(entity -> entity.processTimeout()); } + /** + * This method will return the jobs info which can meet the condition of input param. + * @param dbId used to filter jobs which belong to this db + * @param labelValue used to filter jobs which's label is or like labelValue. + * @param accurateMatch true: filter jobs which's label is labelValue. false: filter jobs which's label like itself. + * @param statesValue used to filter jobs which's state within the statesValue set. + * @return The result is the list of jobInfo. + * JobInfo is a List which includes the comparable object: jobId, label, state etc. + * The result is unordered. + */ + public List> getLoadJobInfosByDb(long dbId, String labelValue, + boolean accurateMatch, Set statesValue) { + LinkedList> loadJobInfos = new LinkedList>(); + if (!dbIdToLabelToLoadJobs.containsKey(dbId)) { + return loadJobInfos; + } + + Set states = Sets.newHashSet(); + if (statesValue == null || statesValue.size() == 0) { + states.addAll(EnumSet.allOf(JobState.class)); + } else { + for (String stateValue : statesValue) { + try { + states.add(JobState.valueOf(stateValue)); + } catch (IllegalArgumentException e) { + // ignore this state + } + } + } + + readLock(); + try { + Map> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId); + if (accurateMatch) { + if (!labelToLoadJobs.containsKey(labelValue)) { + return loadJobInfos; + } + loadJobInfos.addAll(labelToLoadJobs.get(labelValue).stream() + .filter(entity -> states.contains(entity.getState())) + .map(entity -> entity.getShowInfo()).collect(Collectors.toList())); + return loadJobInfos; + } + List loadJobList = Lists.newArrayList(); + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(labelValue)) { + loadJobInfos.addAll(entry.getValue().stream() + .filter(entity -> states.contains(entity.getState())) + .map(entity -> entity.getShowInfo()).collect(Collectors.toList())); + } + } + return loadJobInfos; + } finally { + readUnlock(); + } + } + private Database checkDb(String dbName) throws DdlException { // get db Database db = Catalog.getInstance().getDb(dbName); if (db == null) { + LOG.warn("Database {} does not exist", dbName); throw new DdlException("Database[" + dbName + "] does not exist"); } return db; @@ -145,6 +214,7 @@ private void isLabelUsed(long dbId, String label) if (labelToLoadJobs.containsKey(label)) { List labelLoadJobs = labelToLoadJobs.get(label); if (labelLoadJobs.stream().filter(entity -> !entity.isFinished()).count() != 0) { + LOG.warn("Failed to add load job when label {} has been used.", label); throw new LabelAlreadyUsedException(label); } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index 156745430738af..3a95006aabd25f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -20,13 +20,15 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.catalog.Catalog; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.FailMsg; import org.apache.doris.task.MasterTask; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public abstract class LoadTask extends MasterTask { @@ -34,37 +36,34 @@ public abstract class LoadTask extends MasterTask { protected LoadTaskCallback callback; protected TaskAttachment attachment; - protected boolean isFinished = false; + protected FailMsg failMsg = new FailMsg(); - public LoadTask(LoadTaskCallback callback) { + public LoadTask(LoadTaskCallback callback){ + this.signature = Catalog.getCurrentCatalog().getNextId(); this.callback = callback; } @Override protected void exec() { - Exception exception = null; + boolean isFinished = false; try { // execute pending task executeTask(); - isFinished = true; // callback on pending task finished callback.onTaskFinished(attachment); + isFinished = true; } catch (Exception e) { - exception = e; + failMsg.setMsg(e.getMessage()); LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) .add("error_msg", "Failed to execute load task").build(), e); } finally { if (!isFinished) { // callback on pending task failed - callback.onTaskFailed(exception == null ? "unknown error" : exception.getMessage()); + callback.onTaskFailed(failMsg); } } } - public boolean isFinished() { - return isFinished; - } - /** * execute load task * diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java index 838867dd4e1dcb..de9cfd41b3906d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java @@ -20,10 +20,12 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.load.FailMsg; + public interface LoadTaskCallback { long getCallbackId(); void onTaskFinished(TaskAttachment attachment); - void onTaskFailed(String errMsg); + void onTaskFailed(FailMsg failMsg); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 6bf852a388ed36..9abea3bcd44f8d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -44,6 +44,7 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Joiner; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; @@ -104,7 +105,9 @@ public void plan(List> fileStatusesList, int filesAdded) fileStatusesList, filesAdded); scanNode.setLoadInfo(table, brokerDesc, fileGroups); scanNode.init(analyzer); + scanNode.finalize(analyzer); scanNodes.add(scanNode); + descTable.computeMemLayout(); // 2. Olap table sink String partitionNames = convertBrokerDescPartitionInfo(); @@ -147,9 +150,15 @@ public List getScanNodes() { private String convertBrokerDescPartitionInfo() { String result = ""; for (BrokerFileGroup brokerFileGroup : fileGroups) { + if (brokerFileGroup.getPartitionNames() == null) { + continue; + } result += Joiner.on(",").join(brokerFileGroup.getPartitionNames()); result += ","; } + if (Strings.isNullOrEmpty(result)) { + return null; + } result = result.substring(0, result.length() - 2); return result; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java index 14363fb72ee136..422843cb7fd40a 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java @@ -20,5 +20,14 @@ package org.apache.doris.load.loadv2; -public interface TaskAttachment { +public class TaskAttachment { + private long taskId; + + public TaskAttachment(long taskId) { + this.taskId = taskId; + } + + public long getTaskId() { + return taskId; + } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 518f8b1a1f0437..c0073df93961ad 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -50,7 +50,6 @@ import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; -import org.apache.doris.transaction.TxnStateChangeCallback; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index dfc90a54e68152..e85ce1a7c713bc 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -67,6 +67,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.load.LoadJob.EtlJobType; +import org.apache.doris.load.loadv2.LoadManager; /** * Created by zhaochun on 14/11/10. @@ -112,7 +113,12 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt, String origStmt) th } jobType = EtlJobType.HADOOP; } - catalog.getLoadInstance().addLoadJob(loadStmt, jobType, System.currentTimeMillis()); + // TODO(ml): WIP + if (loadStmt.getVersion().equals(LoadManager.VERSION)) { + catalog.getLoadManager().createLoadJobFromStmt(loadStmt); + } else { + catalog.getLoadInstance().addLoadJob(loadStmt, jobType, System.currentTimeMillis()); + } } else if (ddlStmt instanceof CancelLoadStmt) { catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt); } else if (ddlStmt instanceof CreateRoutineLoadStmt) { diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 532d8847120663..b18331be993f3c 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -90,8 +90,10 @@ import org.apache.doris.common.proc.PartitionsProcDir; import org.apache.doris.common.proc.ProcNodeInterface; import org.apache.doris.common.proc.TabletsProcDir; +import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.OrderByPair; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; @@ -118,10 +120,12 @@ import java.net.URL; import java.net.URLConnection; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; // Execute one show statement. public class ShowExecutor { @@ -644,12 +648,31 @@ private void handleShowLoad() throws AnalysisException { } long dbId = db.getId(); + // combine the List of load(v1) and loadManager(v2) Load load = catalog.getLoadInstance(); List> loadInfos = load.getLoadJobInfosByDb(dbId, db.getFullName(), showStmt.getLabelValue(), showStmt.isAccurateMatch(), - showStmt.getStates(), - showStmt.getOrderByPairs()); + showStmt.getStates()); + Set statesValue = showStmt.getStates() == null ? null : showStmt.getStates().stream() + .map(entity -> entity.name()) + .collect(Collectors.toSet()); + loadInfos.addAll(catalog.getLoadManager().getLoadJobInfosByDb(dbId, showStmt.getLabelValue(), + showStmt.isAccurateMatch(), + statesValue)); + + // order the result of List by orderByPairs in show stmt + List orderByPairs = showStmt.getOrderByPairs(); + ListComparator> comparator = null; + if (orderByPairs != null) { + OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()]; + comparator = new ListComparator>(orderByPairs.toArray(orderByPairArr)); + } else { + // sort by id asc + comparator = new ListComparator>(0); + } + Collections.sort(loadInfos, comparator); + List> rows = Lists.newArrayList(); for (List loadInfo : loadInfos) { List oneInfo = new ArrayList(loadInfo.size()); diff --git a/fe/src/main/java/org/apache/doris/transaction/TabletCommitInfo.java b/fe/src/main/java/org/apache/doris/transaction/TabletCommitInfo.java index 91f6f43d43b854..06c4e908f2d82d 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TabletCommitInfo.java +++ b/fe/src/main/java/org/apache/doris/transaction/TabletCommitInfo.java @@ -21,6 +21,7 @@ import org.apache.doris.thrift.TTabletCommitInfo; import com.google.common.collect.Lists; +import com.google.gson.Gson; import java.io.DataInput; import java.io.DataOutput; @@ -65,4 +66,10 @@ public void readFields(DataInput in) throws IOException { tabletId = in.readLong(); backendId = in.readLong(); } + + @Override + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } }