Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/util/uid_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ std::ostream& operator<<(std::ostream& os, const UniqueId& uid) {

std::string print_id(const TUniqueId& id) {
std::stringstream out;
out << std::hex << id.hi << ":" << id.lo;
out << std::hex << id.hi << "-" << id.lo;
return out.str();
}

std::string print_id(const PUniqueId& id) {
std::stringstream out;
out << std::hex << id.hi() << ":" << id.lo();
out << std::hex << id.hi() << "-" << id.lo();
return out.str();
}

bool parse_id(const std::string& s, TUniqueId* id) {
DCHECK(id != NULL);

const char* hi_part = s.c_str();
char* colon = const_cast<char*>(strchr(hi_part, ':'));
char* colon = const_cast<char*>(strchr(hi_part, '-'));

if (colon == NULL) {
return false;
Expand Down
21 changes: 21 additions & 0 deletions fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Function;
Expand Down Expand Up @@ -57,6 +59,7 @@ public class LoadStmt extends DdlStmt {
public static final String LOAD_DELETE_FLAG_PROPERTY = "load_delete_flag";
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String CLUSTER_PROPERTY = "cluster";
private static final String VERSION = "version";

// for load data from Baidu Object Store(BOS)
public static final String BOS_ENDPOINT = "bos_endpoint";
Expand All @@ -78,13 +81,16 @@ public class LoadStmt extends DdlStmt {
private final Map<String, String> properties;
private String user;

private static String version = "v1";

// properties set
private final static ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(TIMEOUT_PROPERTY)
.add(MAX_FILTER_RATIO_PROPERTY)
.add(LOAD_DELETE_FLAG_PROPERTY)
.add(EXEC_MEM_LIMIT)
.add(CLUSTER_PROPERTY)
.add(VERSION)
.build();

public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
Expand Down Expand Up @@ -170,6 +176,17 @@ public static void checkProperties(Map<String, String> properties) throws DdlExc
throw new DdlException(MAX_FILTER_RATIO_PROPERTY + " is not a number.");
}
}

// version
final String versionProperty = properties.get(VERSION);
// TODO(ml): only support v1
if (versionProperty != null) {
if (!versionProperty.equalsIgnoreCase(LoadManager.VERSION)) {
throw new DdlException(VERSION + " must be " + LoadManager.VERSION);
}
version = LoadManager.VERSION;
}

}

@Override
Expand Down Expand Up @@ -203,6 +220,10 @@ public boolean needAuditEncryption() {
return false;
}

public String getVersion() {
return version;
}

@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
Expand Down
3 changes: 3 additions & 0 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,9 @@ private void transferToMaster() throws IOException {
LoadChecker.init(Config.load_checker_interval_second * 1000L);
LoadChecker.startAll();

// New load scheduler
loadJobScheduler.start();

// Export checker
ExportChecker.init(Config.export_checker_interval_second * 1000L);
ExportChecker.startAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public ProcResult fetchResult() throws AnalysisException {
result.setNames(TITLE_NAMES);

LinkedList<List<Comparable>> loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(),
null, false, null, null);
null, false, null);
int counter = 0;
Iterator<List<Comparable>> iterator = loadJobInfos.descendingIterator();
while (iterator.hasNext()) {
Expand Down
11 changes: 1 addition & 10 deletions fe/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,7 @@ public List<AsyncDeleteJob> getCopiedAsyncDeleteJobs() {
}

public LinkedList<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName, String labelValue,
boolean accurateMatch, Set<JobState> states, ArrayList<OrderByPair> orderByPairs) {
boolean accurateMatch, Set<JobState> states) {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
readLock();
try {
Expand Down Expand Up @@ -1549,15 +1549,6 @@ public LinkedList<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName
readUnlock();
}

ListComparator<List<Comparable>> comparator = null;
if (orderByPairs != null) {
OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()];
comparator = new ListComparator<List<Comparable>>(orderByPairs.toArray(orderByPairArr));
} else {
// sort by id asc
comparator = new ListComparator<List<Comparable>>(0);
}
Collections.sort(loadJobInfos, comparator);
return loadJobInfos;
}

Expand Down
66 changes: 49 additions & 17 deletions fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.PullLoadSourceInfo;

import com.google.common.base.Joiner;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -57,13 +59,11 @@ public class BrokerLoadJob extends LoadJob {
// include broker desc and data desc
private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo();

// it will be set to true when pending task finished
private boolean isLoading = false;

public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc) {
super(dbId, label);
this.timeoutSecond = Config.pull_load_task_default_timeout_second;
this.brokerDesc = brokerDesc;
this.jobType = org.apache.doris.load.LoadJob.EtlJobType.BROKER;
}

public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
Expand Down Expand Up @@ -116,8 +116,8 @@ public void onTaskFinished(TaskAttachment attachment) {
}

@Override
public void onTaskFailed(String errMsg) {
cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, errMsg);
public void onTaskFailed(FailMsg failMsg) {
cancelJobWithoutCheck(failMsg);
}

/**
Expand All @@ -136,28 +136,30 @@ private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) {
.build());
return;
}
if (isLoading) {
if (finishedTaskIds.contains(attachment.getTaskId())) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("task_id", attachment.getTaskId())
.add("error_msg", "this is a duplicated callback of pending task "
+ "when broker already has loading task")
.build());
return;
}
isLoading = true;

// add task id into finishedTaskIds
finishedTaskIds.add(attachment.getTaskId());
} finally {
writeUnlock();
}

Database db = null;
try {
db = getDb();
Database db = getDb();
createLoadingTask(db, attachment);
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
.add("error_msg", "Failed to divide job into loading task.")
.build(), e);
cancelJobWithoutCheck(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage());
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()));
return;
}

Expand Down Expand Up @@ -208,34 +210,62 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
return;
}

// check if task has been finished
if (finishedTaskIds.contains(attachment.getTaskId())) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("task_id", attachment.getTaskId())
.add("error_msg", "this is a duplicated callback of loading task").build());
return;
}

// update loading status
finishedTaskIds.add(attachment.getTaskId());
updateLoadingStatus(attachment);

// begin commit txn when all of loading tasks have been finished
if (!(tasks.size() == tasks.stream()
.filter(entity -> entity.isFinished()).count())) {
if (finishedTaskIds.size() != tasks.size()) {
return;
}

// check data quality
if (!checkDataQuality()) {
executeCancel(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG);
executeCancel(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG));
return;
}
} finally {
writeUnlock();
}

if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id)
.add("commit_infos", Joiner.on(",").join(commitInfos))
.build());
}

Database db = null;
try {
db = getDb();
} catch (MetaNotFoundException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
.add("error_msg", "db has been deleted when job is loading")
.build(), e);
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()));
}
db.writeLock();
try {

Catalog.getCurrentGlobalTransactionMgr().commitTransaction(
dbId, transactionId, commitInfos);
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
.add("error_msg", "Failed to commit txn.")
.build(), e);
cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage());
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()));
return;
} finally {
db.writeUnlock();
}
}

Expand All @@ -248,15 +278,17 @@ private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) {
loadingStatus.setTrackingUrl(attachment.getTrackingUrl());
}
commitInfos.addAll(attachment.getCommitInfoList());
int finishedTaskNum = (int) tasks.stream().filter(entity -> entity.isFinished()).count();
progress = finishedTaskNum / tasks.size() * 100;
progress = finishedTaskIds.size() / tasks.size() * 100;
if (progress == 100) {
progress = 99;
}
}

private String increaseCounter(String key, String deltaValue) {
long value = Long.valueOf(loadingStatus.getCounters().get(key));
long value = 0;
if (loadingStatus.getCounters().containsKey(key)) {
value = Long.valueOf(loadingStatus.getCounters().get(key));
}
if (deltaValue != null) {
value += Long.valueOf(deltaValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.FailMsg;
import org.apache.doris.thrift.TBrokerFileStatus;

import com.google.common.collect.Lists;
Expand All @@ -47,9 +48,10 @@ public BrokerLoadPendingTask(LoadTaskCallback loadTaskCallback,
Map<Long, List<BrokerFileGroup>> tableToBrokerFileList,
BrokerDesc brokerDesc) {
super(loadTaskCallback);
this.attachment = new BrokerPendingTaskAttachment();
this.attachment = new BrokerPendingTaskAttachment(signature);
this.tableToBrokerFileList = tableToBrokerFileList;
this.brokerDesc = brokerDesc;
this.failMsg = new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import java.util.List;
import java.util.Map;

public class BrokerLoadingTaskAttachment implements TaskAttachment{
public class BrokerLoadingTaskAttachment extends TaskAttachment {

private Map<String, String> counters;
private String trackingUrl;
private List<TabletCommitInfo> commitInfoList;

public BrokerLoadingTaskAttachment(Map<String, String> counters, String trackingUrl,
public BrokerLoadingTaskAttachment(long taskId, Map<String, String> counters, String trackingUrl,
List<TabletCommitInfo> commitInfoList) {
super(taskId);
this.trackingUrl = trackingUrl;
this.counters = counters;
this.commitInfoList = commitInfoList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import java.util.List;
import java.util.Map;

public class BrokerPendingTaskAttachment implements TaskAttachment {
public class BrokerPendingTaskAttachment extends TaskAttachment {

// table id -> file status
private Map<Long, List<List<TBrokerFileStatus>>> fileStatusMap = Maps.newHashMap();
// table id -> total file num
private Map<Long, Integer> fileNumMap = Maps.newHashMap();

public BrokerPendingTaskAttachment(long taskId) {
super(taskId);
}

public void addFileStatus(long tableId, List<List<TBrokerFileStatus>> fileStatusList) {
fileStatusMap.put(tableId, fileStatusList);
fileNumMap.put(tableId, fileStatusList.stream().mapToInt(entity -> entity.size()).sum());
Expand Down
Loading