Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@

public class DuplicatedRequestException extends DdlException {

public DuplicatedRequestException(String msg) {
private String duplicatedRequestId;

public DuplicatedRequestException(String duplicatedRequestId, String msg) {
super(msg);
this.duplicatedRequestId = duplicatedRequestId;
}

public String getDuplicatedRequestId() {
return duplicatedRequestId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@

/**
* There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn.
* Step1: BrokerPendingTask will be created on method of executeJob.
* Step1: BrokerPendingTask will be created on method of unprotectedExecuteJob.
* Step2: LoadLoadingTasks will be created by the method of onTaskFinished when BrokerPendingTask is finished.
* Step3: CommitAndPublicTxn will be called by the method of onTaskFinished when all of LoadLoadingTasks are finished.
*/
Expand Down Expand Up @@ -179,7 +179,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti
}

@Override
protected void executeJob() {
protected void unprotectedExecuteJob() {
LoadTask task = new BrokerLoadPendingTask(this, dataSourceInfo.getIdToFileGroups(), brokerDesc);
idToTasks.put(task.getSignature(), task);
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
Expand Down
22 changes: 13 additions & 9 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,19 +283,23 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti
public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException {
writeLock();
try {
// check if job state is pending
if (state != JobState.PENDING) {
return;
}
// the limit of job will be restrict when begin txn
beginTxn();
executeJob();
unprotectedUpdateState(JobState.LOADING);
unprotectedExecute();
} finally {
writeUnlock();
}
}

public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException {
// check if job state is pending
if (state != JobState.PENDING) {
return;
}
// the limit of job will be restrict when begin txn
beginTxn();
unprotectedExecuteJob();
unprotectedUpdateState(JobState.LOADING);
}

public void processTimeout() {
writeLock();
try {
Expand All @@ -309,7 +313,7 @@ public void processTimeout() {
logFinalOperation();
}

protected void executeJob() {
protected void unprotectedExecuteJob() {
}

/**
Expand Down
18 changes: 11 additions & 7 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,22 @@ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws User
checkLabelUsed(database.getId(), request.getLabel(), request.getCreate_timestamp());
loadJob = new MiniLoadJob(database.getId(), request);
createLoadJob(loadJob);
// Mini load job must be executed before release write lock.
// Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun.
loadJob.unprotectedExecute();
} catch (DuplicatedRequestException e) {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, e.getDuplicatedRequestId())
.add("msg", "the duplicated request returns the txn id "
+ "which was created by the same mini load")
.build());
return dbIdToLabelToLoadJobs.get(database.getId()).get(request.getLabel())
.stream().filter(entity -> entity.getState() != JobState.CANCELLED).findFirst()
.get().getTransactionId();
} finally {
writeUnlock();
}

try {
loadJob.execute();
} catch (UserException e) {
loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, e.getMessage()), false);
throw e;
} finally {
writeUnlock();
}

// The persistence of mini load must be the final step of create mini load.
Expand Down Expand Up @@ -510,7 +513,8 @@ private void checkLabelUsed(long dbId, String label, long createTimestamp)
if (loadJobOptional.isPresent()) {
LoadJob loadJob = loadJobOptional.get();
if (loadJob.getCreateTimestamp() == createTimestamp) {
throw new DuplicatedRequestException("The request is duplicated with " + loadJob.getId());
throw new DuplicatedRequestException(String.valueOf(loadJob.getId()),
"The request is duplicated with " + loadJob.getId());
}
LOG.warn("Failed to add load job when label {} has been used.", label);
throw new LabelAlreadyUsedException(label);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {

@Override
public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException {

transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.common.ThriftServerContext;
import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.MiniEtlTaskInfo;
Expand Down Expand Up @@ -760,8 +761,9 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc
@Override
public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.info("receive stream load put request. db:{}, tbl: {}, txn id: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(), clientAddr);
LOG.info("receive stream load put request. db:{}, tbl: {}, txn id: {}, load id: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(), DebugUtil.printId(request.getLoadId()),
clientAddr);
LOG.debug("stream load put request: {}", request);

TStreamLoadPutResult result = new TStreamLoadPutResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void testGetTableNames(@Injectable PullLoadSourceInfo dataSourceInfo,
@Test
public void testExecuteJob(@Mocked MasterTaskExecutor masterTaskExecutor) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
brokerLoadJob.executeJob();
brokerLoadJob.unprotectedExecuteJob();

Map<Long, LoadTask> idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks");
Assert.assertEquals(1, idToTasks.size());
Expand Down