diff --git a/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java b/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java index 7da0985a912ec7..e6aa8d3c1633a3 100644 --- a/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java +++ b/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java @@ -19,7 +19,14 @@ public class DuplicatedRequestException extends DdlException { - public DuplicatedRequestException(String msg) { + private String duplicatedRequestId; + + public DuplicatedRequestException(String duplicatedRequestId, String msg) { super(msg); + this.duplicatedRequestId = duplicatedRequestId; + } + + public String getDuplicatedRequestId() { + return duplicatedRequestId; } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 598fff225b9fbc..f7d06844af7a47 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -59,7 +59,7 @@ /** * There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn. - * Step1: BrokerPendingTask will be created on method of executeJob. + * Step1: BrokerPendingTask will be created on method of unprotectedExecuteJob. * Step2: LoadLoadingTasks will be created by the method of onTaskFinished when BrokerPendingTask is finished. * Step3: CommitAndPublicTxn will be called by the method of onTaskFinished when all of LoadLoadingTasks are finished. */ @@ -179,7 +179,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti } @Override - protected void executeJob() { + protected void unprotectedExecuteJob() { LoadTask task = new BrokerLoadPendingTask(this, dataSourceInfo.getIdToFileGroups(), brokerDesc); idToTasks.put(task.getSignature(), task); Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 5ed2356e1516fe..90df2f8f95459d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -283,19 +283,23 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { writeLock(); try { - // check if job state is pending - if (state != JobState.PENDING) { - return; - } - // the limit of job will be restrict when begin txn - beginTxn(); - executeJob(); - unprotectedUpdateState(JobState.LOADING); + unprotectedExecute(); } finally { writeUnlock(); } } + public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + // check if job state is pending + if (state != JobState.PENDING) { + return; + } + // the limit of job will be restrict when begin txn + beginTxn(); + unprotectedExecuteJob(); + unprotectedUpdateState(JobState.LOADING); + } + public void processTimeout() { writeLock(); try { @@ -309,7 +313,7 @@ public void processTimeout() { logFinalOperation(); } - protected void executeJob() { + protected void unprotectedExecuteJob() { } /** diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 32b76ba84dc96d..455984089b63c3 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -137,19 +137,22 @@ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws User checkLabelUsed(database.getId(), request.getLabel(), request.getCreate_timestamp()); loadJob = new MiniLoadJob(database.getId(), request); createLoadJob(loadJob); + // Mini load job must be executed before release write lock. + // Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun. + loadJob.unprotectedExecute(); } catch (DuplicatedRequestException e) { + LOG.info(new LogBuilder(LogKey.LOAD_JOB, e.getDuplicatedRequestId()) + .add("msg", "the duplicated request returns the txn id " + + "which was created by the same mini load") + .build()); return dbIdToLabelToLoadJobs.get(database.getId()).get(request.getLabel()) .stream().filter(entity -> entity.getState() != JobState.CANCELLED).findFirst() .get().getTransactionId(); - } finally { - writeUnlock(); - } - - try { - loadJob.execute(); } catch (UserException e) { loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, e.getMessage()), false); throw e; + } finally { + writeUnlock(); } // The persistence of mini load must be the final step of create mini load. @@ -510,7 +513,8 @@ private void checkLabelUsed(long dbId, String label, long createTimestamp) if (loadJobOptional.isPresent()) { LoadJob loadJob = loadJobOptional.get(); if (loadJob.getCreateTimestamp() == createTimestamp) { - throw new DuplicatedRequestException("The request is duplicated with " + loadJob.getId()); + throw new DuplicatedRequestException(String.valueOf(loadJob.getId()), + "The request is duplicated with " + loadJob.getId()); } LOG.warn("Failed to add load job when label {} has been used.", label); throw new LabelAlreadyUsedException(label); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 0c2ce00c0b577e..3137d22396a3e4 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -86,7 +86,6 @@ public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException { @Override public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { - transactionId = Catalog.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.BACKEND_STREAMING, id, diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 810990d30392ab..d6f5e7447d9e2d 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -36,6 +36,7 @@ import org.apache.doris.common.ThriftServerContext; import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.LoadJob; import org.apache.doris.load.MiniEtlTaskInfo; @@ -760,8 +761,9 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc @Override public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) throws TException { String clientAddr = getClientAddrAsString(); - LOG.info("receive stream load put request. db:{}, tbl: {}, txn id: {}, backend: {}", - request.getDb(), request.getTbl(), request.getTxnId(), clientAddr); + LOG.info("receive stream load put request. db:{}, tbl: {}, txn id: {}, load id: {}, backend: {}", + request.getDb(), request.getTbl(), request.getTxnId(), DebugUtil.printId(request.getLoadId()), + clientAddr); LOG.debug("stream load put request: {}", request); TStreamLoadPutResult result = new TStreamLoadPutResult(); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index b682a4d0951f30..d99cd8685035a2 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -191,7 +191,7 @@ public void testGetTableNames(@Injectable PullLoadSourceInfo dataSourceInfo, @Test public void testExecuteJob(@Mocked MasterTaskExecutor masterTaskExecutor) { BrokerLoadJob brokerLoadJob = new BrokerLoadJob(); - brokerLoadJob.executeJob(); + brokerLoadJob.unprotectedExecuteJob(); Map idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks"); Assert.assertEquals(1, idToTasks.size());