From 6ce7837948cc25673f4b3f537be5b38149eb9d6c Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 22 Aug 2019 17:11:33 +0800 Subject: [PATCH 1/2] Fix bug that concurrent stream load job may be able to executed concurrently This will cause 2 jobs trying to write same file, and cause file damaged. --- be/src/http/action/mini_load.cpp | 1 + .../stream_load/stream_load_executor.cpp | 1 + .../doris/load/loadv2/BrokerLoadJob.java | 2 +- .../org/apache/doris/load/loadv2/LoadJob.java | 7 +++++++ .../apache/doris/load/loadv2/LoadManager.java | 19 ++++++++++--------- .../apache/doris/load/loadv2/MiniLoadJob.java | 1 + .../load/routineload/RoutineLoadTaskInfo.java | 2 +- .../doris/service/FrontendServiceImpl.java | 3 +-- .../transaction/GlobalTransactionMgr.java | 18 ++++++++++-------- .../doris/transaction/TransactionState.java | 18 +++++++++--------- .../apache/doris/load/loadv2/LoadJobTest.java | 5 +++-- .../transaction/GlobalTransactionMgrTest.java | 9 ++++++--- gensrc/thrift/FrontendService.thrift | 6 ++++-- 13 files changed, 55 insertions(+), 37 deletions(-) diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index b25b07ca2ea111..b529f57dc94a57 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -648,6 +648,7 @@ Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) { request.__set_max_filter_ratio(ctx->max_filter_ratio); } request.__set_create_timestamp(UnixMillis()); + request.__set_request_id(ctx->id.to_thrift()); // begin load by master const TNetworkAddress& master_addr = _exec_env->master_info()->network_address; TMiniLoadBeginResult res; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index dddee816487a02..2ad2fbbf7d93d4 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -128,6 +128,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { if (ctx->timeout_second != -1) { request.__set_timeout(ctx->timeout_second); } + request.__set_request_id(ctx->id.to_thrift()); TLoadTxnBeginResult result; #ifndef BE_TEST diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 2583f04d2198b0..f69759b93349c0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -187,7 +187,7 @@ public Set getTableNames() throws MetaNotFoundException{ @Override public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { transactionId = Catalog.getCurrentGlobalTransactionMgr() - .beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(), + .beginTransaction(dbId, label, null, "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id, timeoutSecond); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index bf637998b4ad74..4b995f94126325 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -120,6 +120,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + // this request id is only used for checking if a load begin request is a duplicate request. + protected TUniqueId requestId; + // only for log replay public LoadJob() { } @@ -191,6 +194,10 @@ public long getTransactionId() { return transactionId; } + public TUniqueId getRequestId() { + return requestId; + } + /** * Show table names for frontend * If table name could not be found by id, the table id will be used instead. diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index a2ed3748f3ff8f..69292ba667a993 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -39,6 +39,7 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMiniLoadBeginRequest; import org.apache.doris.thrift.TMiniLoadRequest; +import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -96,7 +97,7 @@ public void createLoadJobFromStmt(LoadStmt stmt, String originStmt) throws DdlEx LoadJob loadJob = null; writeLock(); try { - checkLabelUsed(dbId, stmt.getLabel().getLabelName(), -1); + checkLabelUsed(dbId, stmt.getLabel().getLabelName(), null); if (stmt.getBrokerDesc() == null) { throw new DdlException("LoadManager only support the broker load."); } @@ -134,7 +135,7 @@ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws User LoadJob loadJob = null; writeLock(); try { - checkLabelUsed(database.getId(), request.getLabel(), request.getCreate_timestamp()); + checkLabelUsed(database.getId(), request.getLabel(), request.getCreate_timestamp(), request.getRequest_id()); loadJob = new MiniLoadJob(database.getId(), request); createLoadJob(loadJob); // Mini load job must be executed before release write lock. @@ -184,7 +185,7 @@ public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long time Database database = checkDb(stmt.getLabel().getDbName()); writeLock(); try { - checkLabelUsed(database.getId(), stmt.getLabel().getLabelName(), -1); + checkLabelUsed(database.getId(), stmt.getLabel().getLabelName(), null); Catalog.getCurrentCatalog().getLoadInstance().addLoadJob(stmt, jobType, timestamp); } finally { writeUnlock(); @@ -209,7 +210,7 @@ public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlEx Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb())); writeLock(); try { - checkLabelUsed(database.getId(), request.getLabel(), -1); + checkLabelUsed(database.getId(), request.getLabel(), null); return Catalog.getCurrentCatalog().getLoadInstance().addLoadJob(request); } finally { writeUnlock(); @@ -220,7 +221,7 @@ public void createLoadJobV1FromMultiStart(String fullDbName, String label) throw Database database = checkDb(fullDbName); writeLock(); try { - checkLabelUsed(database.getId(), label, -1); + checkLabelUsed(database.getId(), label, null); Catalog.getCurrentCatalog().getLoadInstance() .registerMiniLabel(fullDbName, label, System.currentTimeMillis()); } finally { @@ -501,10 +502,10 @@ private void checkTable(Database database, String tableName) throws DdlException * * @param dbId * @param label - * @param createTimestamp the create timestamp of stmt of request + * @param requestId: the uuid of each txn request from BE * @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job. */ - private void checkLabelUsed(long dbId, String label, long createTimestamp) + private void checkLabelUsed(long dbId, String label, TUniqueId requestId) throws DdlException { // if label has been used in old load jobs Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label); @@ -517,9 +518,9 @@ private void checkLabelUsed(long dbId, String label, long createTimestamp) labelLoadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).findFirst(); if (loadJobOptional.isPresent()) { LoadJob loadJob = loadJobOptional.get(); - if (loadJob.getCreateTimestamp() == createTimestamp) { + if (loadJob.getRequestId() != null && requestId != null && loadJob.getRequestId().equals(requestId)) { throw new DuplicatedRequestException(String.valueOf(loadJob.getId()), - "The request is duplicated with " + loadJob.getId()); + "The request is duplicated with " + loadJob.getId()); } LOG.warn("Failed to add load job when label {} has been used.", label); throw new LabelAlreadyUsedException(label); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 3137d22396a3e4..43975bd13b9669 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -64,6 +64,7 @@ public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFound this.createTimestamp = request.getCreate_timestamp(); this.loadStartTimestamp = createTimestamp; this.authorizationInfo = gatherAuthInfo(); + this.requestId = requestId.getRequest_id(); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index dcf12003f03848..997a57a0546642 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -130,7 +130,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti // begin a txn for task RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - routineLoadJob.getDbId(), DebugUtil.printId(id), -1, "FE: " + FrontendOptions.getLocalHostAddress(), + routineLoadJob.getDbId(), DebugUtil.printId(id), null, "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(), routineLoadJob.getMaxBatchIntervalS() * 2); } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ccdba2aef1f0c9..6b722e869cb4cb 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -646,10 +646,9 @@ private long loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) thr } // begin - long timestamp = request.isSetTimestamp() ? request.getTimestamp() : -1; long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; return Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - db.getId(), request.getLabel(), timestamp, "BE: " + clientIp, + db.getId(), request.getLabel(), request.getRequest_id(), "BE: " + clientIp, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index d88b79deeee3cb..0b7376635dfd50 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -44,6 +44,7 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import com.google.common.base.Joiner; @@ -107,22 +108,22 @@ public TxnStateCallbackFactory getCallbackFactory() { public long beginTransaction(long dbId, String label, String coordinator, LoadJobSourceType sourceType, long timeoutSecond) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException { - return beginTransaction(dbId, label, -1, coordinator, sourceType, -1, timeoutSecond); + return beginTransaction(dbId, label, null, coordinator, sourceType, -1, timeoutSecond); } /** * the app could specify the transaction id * - * timestamp is used to judge that whether the request is a internal retry request - * if label already exist, and timestamp are equal, we return the exist tid, and consider this 'begin' + * requestId is used to judge that whether the request is a internal retry request + * if label already exist, and requestId are equal, we return the exist tid, and consider this 'begin' * as success. - * timestamp == -1 is for compatibility + * requestId == null is for compatibility * * @param coordinator * @throws BeginTransactionException * @throws IllegalTransactionParameterException */ - public long beginTransaction(long dbId, String label, long timestamp, + public long beginTransaction(long dbId, String label, TUniqueId requestId, String coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException { @@ -145,10 +146,11 @@ public long beginTransaction(long dbId, String label, long timestamp, Map txnLabels = dbIdToTxnLabels.row(dbId); if (txnLabels != null && txnLabels.containsKey(label)) { // check timestamp - if (timestamp != -1) { + if (requestId != null) { TransactionState existTxn = getTransactionState(txnLabels.get(label)); if (existTxn != null && existTxn.getTransactionStatus() == TransactionStatus.PREPARE - && existTxn.getTimestamp() == timestamp) { + && existTxn.getRequsetId() != null && existTxn.getRequsetId().equals(requestId)) { + // this may be a retry request for same job, just return existing txn id. return txnLabels.get(label); } } @@ -161,7 +163,7 @@ public long beginTransaction(long dbId, String label, long timestamp, } long tid = idGenerator.getNextTransactionId(); LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator); - TransactionState transactionState = new TransactionState(dbId, tid, label, timestamp, sourceType, + TransactionState transactionState = new TransactionState(dbId, tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); transactionState.setPrepareTime(System.currentTimeMillis()); unprotectUpsertTransactionState(transactionState); diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index aee9934a3ca38e..b44a81a1142f90 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -25,6 +25,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.metric.MetricRepo; import org.apache.doris.task.PublishVersionTask; +import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Joiner; import com.google.common.base.Strings; @@ -109,9 +110,9 @@ public String toString() { private long dbId; private long transactionId; private String label; - // timestamp is used to judge whether a begin request is a internal retry request. - // no need to persist it - private long timestamp; + // requsetId is used to judge whether a begin request is a internal retry request. + // no need to persist it. + private TUniqueId requsetId; private Map idToTableCommitInfos; // coordinator is show who begin this txn (FE, or one of BE, etc...) private String coordinator; @@ -146,7 +147,6 @@ public TransactionState() { this.dbId = -1; this.transactionId = -1; this.label = ""; - this.timestamp = -1; this.idToTableCommitInfos = Maps.newHashMap(); this.coordinator = ""; this.transactionStatus = TransactionStatus.PREPARE; @@ -161,12 +161,12 @@ public TransactionState() { this.latch = new CountDownLatch(1); } - public TransactionState(long dbId, long transactionId, String label, long timestamp, + public TransactionState(long dbId, long transactionId, String label, TUniqueId requsetId, LoadJobSourceType sourceType, String coordinator, long callbackId, long timeoutMs) { this.dbId = dbId; this.transactionId = transactionId; this.label = label; - this.timestamp = timestamp; + this.requsetId = requsetId; this.idToTableCommitInfos = Maps.newHashMap(); this.coordinator = coordinator; this.transactionStatus = TransactionStatus.PREPARE; @@ -215,9 +215,9 @@ public long getPublishVersionTime() { public boolean hasSendTask() { return this.hasSendTask; } - - public long getTimestamp() { - return timestamp; + + public TUniqueId getRequsetId() { + return requsetId; } public long getTransactionId() { diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java index b91b3fd2a3596a..3f5a72289c0b8b 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java @@ -24,11 +24,11 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.load.Load; import org.apache.doris.metric.LongCounterMetric; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.EditLog; import org.apache.doris.task.MasterTaskExecutor; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; @@ -104,7 +104,8 @@ public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr, LoadJob loadJob = new BrokerLoadJob(); new Expectations() { { - globalTransactionMgr.beginTransaction(anyLong, anyString, anyLong, anyString, (TransactionState.LoadJobSourceType) any, anyLong, anyLong); + globalTransactionMgr.beginTransaction(anyLong, anyString, (TUniqueId) any, anyString, + (TransactionState.LoadJobSourceType) any, anyLong, anyLong); result = 1; } }; diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index fb5375933dd938..8872e83f056f3d 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -17,7 +17,10 @@ package org.apache.doris.transaction; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; @@ -314,7 +317,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", partitionIdToOffset); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); - TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, + TransactionState transactionState = new TransactionState(1L, 1L, "label", null, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); @@ -380,7 +383,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", partitionIdToOffset); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); - TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, + TransactionState transactionState = new TransactionState(1L, 1L, "label", null, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 33b6a3abe1aa62..cc90080dddc50d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -455,7 +455,8 @@ struct TMiniLoadBeginRequest { 9: optional i64 timeout_second 10: optional double max_filter_ratio 11: optional i64 auth_code - 12: optional i64 create_timestamp; + 12: optional i64 create_timestamp + 13: optional Types.TUniqueId request_id } struct TIsMethodSupportedRequest { @@ -481,10 +482,11 @@ struct TLoadTxnBeginRequest { 5: required string tbl 6: optional string user_ip 7: required string label - 8: optional i64 timestamp + 8: optional i64 timestamp // deprecated, use request_id instead 9: optional i64 auth_code // The real value of timeout should be i32. i64 ensures the compatibility of interface. 10: optional i64 timeout + 11: optional Types.TUniqueId request_id } struct TLoadTxnBeginResult { From 82ed041bd8a52a5390dad0c3bfe18c4893b85abd Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 22 Aug 2019 18:03:52 +0800 Subject: [PATCH 2/2] fix bug --- .../main/java/org/apache/doris/load/loadv2/LoadManager.java | 2 +- .../main/java/org/apache/doris/load/loadv2/MiniLoadJob.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 69292ba667a993..2cd338199853a4 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -135,7 +135,7 @@ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws User LoadJob loadJob = null; writeLock(); try { - checkLabelUsed(database.getId(), request.getLabel(), request.getCreate_timestamp(), request.getRequest_id()); + checkLabelUsed(database.getId(), request.getLabel(), request.getRequest_id()); loadJob = new MiniLoadJob(database.getId(), request); createLoadJob(loadJob); // Mini load job must be executed before release write lock. diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 43975bd13b9669..bc75ec860a2f25 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -64,7 +64,7 @@ public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFound this.createTimestamp = request.getCreate_timestamp(); this.loadStartTimestamp = createTimestamp; this.authorizationInfo = gatherAuthInfo(); - this.requestId = requestId.getRequest_id(); + this.requestId = request.getRequest_id(); } @Override @@ -88,7 +88,7 @@ public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException { @Override public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { transactionId = Catalog.getCurrentGlobalTransactionMgr() - .beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(), + .beginTransaction(dbId, label, null, "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.BACKEND_STREAMING, id, timeoutSecond); }