diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 58621c77a2a115..051aca3e13044e 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -45,6 +45,7 @@ #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" #include "thrift/protocol/TDebugProtocol.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" @@ -174,6 +175,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { request.__set_timeout(ctx->timeout_second); } request.__set_request_id(ctx->id.to_thrift()); + request.__set_backend_id(_exec_env->master_info()->backend_id); TLoadTxnBeginResult result; Status status; @@ -309,6 +311,8 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, } Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { + DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK); + DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1); TLoadTxnCommitRequest request; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index c4ef8f6597f6a3..8a3a0573cc7f3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -59,6 +59,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.rewrite.ExprRewriter; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TUniqueId; @@ -406,7 +407,9 @@ public void analyze(Analyzer analyzer) throws UserException { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), Lists.newArrayList(targetTable.getId()), label.getLabelName(), - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); } isTransactionBegin = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 0718bda3433d32..297ac9f1746a99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1160,7 +1160,12 @@ public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) thro } @Override - public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) { + public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) { + // do nothing in cloud mode + } + + @Override + public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit) { // do nothing in cloud mode } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java index 5db50a7da891e4..7e913bd9a93589 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java @@ -286,15 +286,15 @@ public static LoadJobFinalOperation loadJobFinalOperationFromPb(TxnCommitAttachm public static TxnCoordinatorPB txnCoordinatorToPb(TxnCoordinator txnCoordinator) { TxnCoordinatorPB.Builder builder = TxnCoordinatorPB.newBuilder(); builder.setSourceType(TxnSourceTypePB.forNumber(txnCoordinator.sourceType.value())); + builder.setId(txnCoordinator.id); builder.setIp(txnCoordinator.ip); + builder.setStartTime(txnCoordinator.startTime); return builder.build(); } public static TxnCoordinator txnCoordinatorFromPb(TxnCoordinatorPB txnCoordinatorPB) { - TxnCoordinator txnCoordinator = new TxnCoordinator(); - txnCoordinator.sourceType = TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber()); - txnCoordinator.ip = txnCoordinatorPB.getIp(); - return txnCoordinator; + return new TxnCoordinator(TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber()), + txnCoordinatorPB.getId(), txnCoordinatorPB.getIp(), txnCoordinatorPB.getStartTime()); } public static TransactionState transactionStateFromPb(TxnInfoPB txnInfo) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 0cc18e7c73dd35..5767e303f35ebf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -27,6 +27,7 @@ import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; @@ -353,6 +354,11 @@ private String getCloudClusterName(HttpServletRequest request) { private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit) throws LoadException { + long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); + if (debugBackendId != -1L) { + Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); + return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + } if (Config.isCloudMode()) { String cloudClusterName = getCloudClusterName(request); if (Strings.isNullOrEmpty(cloudClusterName)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index c766f107aa1d42..f3915bc6f4cb29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -51,6 +51,7 @@ import org.apache.doris.planner.PartitionPruner; import org.apache.doris.planner.RangePartitionPrunerV2; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -284,8 +285,9 @@ public void setCountDownLatch(MarkedCountDownLatch countDownLatch) { public long beginTxn() throws Exception { long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(), Lists.newArrayList(deleteInfo.getTableId()), label, null, - new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.FRONTEND, id, Config.stream_load_default_timeout_second); this.transactionId = txnId; Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 2ffe85bb36aa32..fbe7db45bbffdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -50,6 +50,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; @@ -123,7 +124,9 @@ public void beginTxn() QuotaExceedException, MetaNotFoundException { transactionId = Env.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id, getTimeout()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 61687e1c4b46f0..1d1dc426fda72b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -62,6 +62,7 @@ import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.sparkdpp.DppResult; import org.apache.doris.sparkdpp.EtlJobConfig; @@ -199,7 +200,9 @@ public void beginTxn() QuotaExceedException, MetaNotFoundException { transactionId = Env.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.FRONTEND, id, getTimeout()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index cdee942f40833e..d101d98cf850a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -27,6 +27,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; @@ -199,7 +200,9 @@ public boolean beginTxn() throws UserException { try { txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(), timeoutMs / 1000); } catch (DuplicatedRequestException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 3126984d33a3af..2aea7bad800bf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -32,6 +32,7 @@ import org.apache.doris.load.sync.model.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.InsertStreamTxnExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.SyncTask; import org.apache.doris.task.SyncTaskPool; @@ -132,8 +133,10 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce try { long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label, - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, - FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond); + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), + sourceType, timeoutSecond); String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); request = new TStreamLoadPutRequest() .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 32121b9833b480..c452a242dcc46b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -58,6 +58,7 @@ import org.apache.doris.qe.MasterTxnExecutor; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; @@ -193,9 +194,10 @@ private static void beginBatchInsertTransaction(ConnectContext ctx, String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) { txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), - label, new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label, + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); } else { MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ctx); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 2ed8bae8c3ebd7..b4f5503ed44ca4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -45,6 +45,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TOlapTableLocationParam; @@ -96,7 +97,9 @@ public void beginTransaction() { try { this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), ImmutableList.of(table.getId()), labelName, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout()); } catch (Exception e) { throw new AnalysisException("begin transaction failed. " + e.getMessage(), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 9d2568c973d3ba..f1cba182df4f07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -189,6 +189,7 @@ import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; @@ -2197,9 +2198,10 @@ private void beginTxn(String dbName, String tblName) throws UserException, TExce String label = txnEntry.getLabel(); if (Env.getCurrentEnv().isMaster()) { long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), - label, new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label, + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); txnConf.setTxnId(txnId); String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index e05b126ca3b041..76a191dfbbbc69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1217,7 +1217,9 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP); // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; - TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, clientIp); + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + long startTime = backend != null ? backend.getLastStartTime() : 0; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime); if (request.isSetToken()) { txnCoord.isFromInternal = true; } @@ -1325,10 +1327,12 @@ private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String clientIp) // step 5: get timeout long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + long startTime = backend != null ? backend.getLastStartTime() : 0; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime); // step 6: begin transaction long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - db.getId(), tableIdList, request.getLabel(), request.getRequestId(), - new TxnCoordinator(TxnSourceType.BE, clientIp), + db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); // step 7: return result @@ -2105,6 +2109,25 @@ private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResu httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode); httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode); httpStreamParams.getParams().setNumLocalSink(1); + + TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState( + httpStreamParams.getDb().getId(), httpStreamParams.getTxnId()); + if (txnState == null) { + LOG.warn("Not found http stream related txn, txn id = {}", httpStreamParams.getTxnId()); + } else { + TxnCoordinator txnCoord = txnState.getCoordinator(); + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + if (backend != null) { + // only modify txnCoord in memory, not write editlog yet. + txnCoord.sourceType = TxnSourceType.BE; + txnCoord.id = backend.getId(); + txnCoord.ip = backend.getHost(); + txnCoord.startTime = backend.getLastStartTime(); + LOG.info("Change http stream related txn {} to coordinator {}", + httpStreamParams.getTxnId(), txnCoord); + } + } + result.setPipelineParams(httpStreamParams.getParams()); result.getPipelineParams().setDbName(httpStreamParams.getDb().getFullName()); result.getPipelineParams().setTableName(httpStreamParams.getTable().getName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 8bf20844ea1ca3..5f49e88ba6a0c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -170,14 +170,21 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { BackendHbResponse hbResponse = (BackendHbResponse) response; Backend be = nodeMgr.getBackend(hbResponse.getBeId()); if (be != null) { + long oldStartTime = be.getLastStartTime(); boolean isChanged = be.handleHbResponse(hbResponse, isReplay); - if (hbResponse.getStatus() != HbStatus.OK) { + if (hbResponse.getStatus() == HbStatus.OK) { + long newStartTime = be.getLastStartTime(); + if (!isReplay && oldStartTime != newStartTime) { + Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart( + be.getId(), be.getHost(), newStartTime); + } + } else { // invalid all connections cached in ClientPool ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) { - Env.getCurrentGlobalTransactionMgr() - .abortTxnWhenCoordinateBeDown(be.getHost(), 100); + Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown( + be.getId(), be.getHost(), 100); } } return isChanged; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index a5c23a63a7aaf5..3d4bddbc6606fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -2036,13 +2036,16 @@ public TransactionState getTransactionStateByCallbackId(long callbackId) { return null; } - public List> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + public List> getPrepareTransactionIdByCoordinateBe(long coordinateBeId, + String coordinateHost, int limit) { ArrayList> txnInfos = new ArrayList<>(); readLock(); try { idToRunningTransactionState.values().stream() .filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE - && t.getCoordinator().ip.equals(coordinateHost))) + && t.getTransactionStatus() == TransactionStatus.PREPARE + && t.getCoordinator().ip.equals(coordinateHost) + && (t.getCoordinator().id == 0 || t.getCoordinator().id == coordinateBeId))) .limit(limit) .forEach(t -> txnInfos.add(Pair.of(t.getDbId(), t.getTransactionId()))); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index ae1a467205f10e..719999b809428a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -566,20 +566,35 @@ public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) thro dbTransactionMgr.updateDatabaseUsedQuotaData(usedQuotaDataBytes); } + @Override + public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) { + List> transactionIdByCoordinateBe + = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, Integer.MAX_VALUE); + for (Pair txnInfo : transactionIdByCoordinateBe) { + try { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); + TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); + long coordStartTime = transactionState.getCoordinator().startTime; + if (coordStartTime > 0 && coordStartTime < beStartTime) { + dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null); + } + } catch (UserException e) { + LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); + } + } + } + /** * If a Coordinate BE is down when running txn, the txn will remain in FE until killed by timeout * So when FE identify the Coordinate BE is down, FE should cancel it initiative */ @Override - public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) { - List> transactionIdByCoordinateBe = getTransactionIdByCoordinateBe(coordinateHost, limit); + public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit) { + List> transactionIdByCoordinateBe + = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit); for (Pair txnInfo : transactionIdByCoordinateBe) { try { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); - TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); - if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) { - continue; - } dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null); } catch (UserException e) { LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); @@ -763,11 +778,12 @@ private TransactionState getTransactionStateByCallbackId(long dbId, long callbac } } - @Deprecated - protected List> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + protected List> getPrepareTransactionIdByCoordinateBe(long coordinateBeId, + String coordinateHost, int limit) { ArrayList> txnInfos = new ArrayList<>(); for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { - txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit)); + txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe( + coordinateBeId, coordinateHost, limit)); if (txnInfos.size() > limit) { break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java index c8c9ff380379f4..6fd32c8ee7bc8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java @@ -134,7 +134,9 @@ public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest requ public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) throws AnalysisException; - public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit); + public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime); + + public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit); public TransactionStatus getLabelState(long dbId, String label) throws AnalysisException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index da28dd8250fb6c..85b7ace42ee043 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -38,6 +38,7 @@ import org.apache.doris.qe.MasterOpExecutor; import org.apache.doris.qe.MasterTxnExecutor; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TLoadTxnBeginRequest; @@ -203,7 +204,9 @@ public long beginTransaction(TableIf table) throws Exception { if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) { this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), Lists.newArrayList(table.getId()), label, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.INSERT_STREAMING, timeoutSecond); } else { String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index f628c945b6cd1e..f03457b0f094c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -166,8 +166,14 @@ public static TxnSourceType valueOf(int flag) { public static class TxnCoordinator { @SerializedName(value = "sourceType") public TxnSourceType sourceType; + // backendId for backend, 0 for frontend + @SerializedName(value = "id") + public long id = 0; @SerializedName(value = "ip") public String ip; + // frontend/backend start time + @SerializedName(value = "st") + public long startTime = 0; // True if this txn if created by system(such as writing data to audit table) @SerializedName(value = "ii") public boolean isFromInternal = false; @@ -175,9 +181,11 @@ public static class TxnCoordinator { public TxnCoordinator() { } - public TxnCoordinator(TxnSourceType sourceType, String ip) { + public TxnCoordinator(TxnSourceType sourceType, long id, String ip, long startTime) { this.sourceType = sourceType; + this.id = id; this.ip = ip; + this.startTime = startTime; } @Override @@ -319,7 +327,8 @@ public TransactionState() { this.transactionId = -1; this.label = ""; this.idToTableCommitInfos = Maps.newHashMap(); - this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); // mocked, to avoid NPE + // mocked, to avoid NPE + this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, "127.0.0.1", System.currentTimeMillis()); this.transactionStatus = TransactionStatus.PREPARE; this.sourceType = LoadJobSourceType.FRONTEND; this.prepareTime = -1; @@ -758,7 +767,7 @@ public void readFields(DataInput in) throws IOException { TableCommitInfo info = TableCommitInfo.read(in); idToTableCommitInfos.put(info.getTableId(), info); } - txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in)); + txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0); transactionStatus = TransactionStatus.valueOf(in.readInt()); sourceType = LoadJobSourceType.valueOf(in.readInt()); prepareTime = in.readLong(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java index a1aa78d2595cf7..f62078c3050e16 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java @@ -61,7 +61,8 @@ public class CloudGlobalTransactionMgrTest { private static GlobalTransactionMgrIface masterTransMgr; private static Env masterEnv; - private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index c20464f089dda0..74121e2d51b74d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -67,8 +67,8 @@ public class DatabaseTransactionMgrTest { private static Env slaveEnv; private static Map LabelToTxnId; - private TransactionState.TxnCoordinator transactionSource = - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); public static void setTransactionFinishPublish(TransactionState transactionState, List backendIds) { if (transactionState.getSubTransactionStates() != null) { @@ -141,7 +141,8 @@ public Map addTransactionToTransactionMgr() throws UserException { labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1); // txn 2, 3, 4 - TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1"); + TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.BE, 0, "be1", System.currentTimeMillis()); long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLabel2, beTransactionSource, @@ -221,7 +222,7 @@ public void testAbortTransactionWithNotFoundException() throws UserException { @Test public void testGetTransactionIdByCoordinateBe() throws UserException { DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1); - List> transactionInfoList = masterDbTransMgr.getTransactionIdByCoordinateBe("be1", 10); + List> transactionInfoList = masterDbTransMgr.getPrepareTransactionIdByCoordinateBe(0, "be1", 10); Assert.assertEquals(3, transactionInfoList.size()); Assert.assertEquals(CatalogTestUtil.testDbId1, transactionInfoList.get(0).first.longValue()); Assert.assertEquals(TransactionStatus.PREPARE, diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index a2826745021cce..a5b2b2115eb965 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -88,7 +88,8 @@ public class GlobalTransactionMgrTest { private static Env masterEnv; private static Env slaveEnv; - private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); protected static List allBackends = Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3); @@ -302,7 +303,9 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, - LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(), + LoadJobSourceType.ROUTINE_LOAD_TASK, + new TxnCoordinator(TxnSourceType.BE, 0, "be1", System.currentTimeMillis()), + routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getCallbackFactory().addCallback(routineLoadJob); @@ -366,7 +369,9 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, - LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(), + LoadJobSourceType.ROUTINE_LOAD_TASK, + new TxnCoordinator(TxnSourceType.BE, 0, "be1", System.currentTimeMillis()), + routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getCallbackFactory().addCallback(routineLoadJob); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java index 2c038aaff370f7..55faca2a789fa2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java @@ -88,8 +88,9 @@ public void testSerDe() throws IOException { UUID uuid = UUID.randomUUID(); TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L), 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), - LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L, - 60 * 1000L); + LoadJobSourceType.BACKEND_STREAMING, + new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", System.currentTimeMillis()), + 50000L, 60 * 1000L); testSerDe(fileName, transactionState, readTransactionState -> { Assert.assertEquals(transactionState.getCoordinator().ip, readTransactionState.getCoordinator().ip); }); @@ -112,7 +113,8 @@ public void testSerDeForBatchLoad() throws IOException { // TransactionState TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L), 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), - LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), + LoadJobSourceType.BACKEND_STREAMING, + new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", System.currentTimeMillis()), TransactionStatus.COMMITTED, "", 100, 50000L, loadJobFinalOperation, 100, 200, 300, 400); // check testSerDe(fileName2, transactionState, readTransactionState -> { @@ -144,7 +146,8 @@ public void testSerDeForRoutineLoad() throws IOException { // TransactionState TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L), 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), - LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), + LoadJobSourceType.BACKEND_STREAMING, + new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", System.currentTimeMillis()), TransactionStatus.COMMITTED, "", 100, 50000L, attachment, 100, 200, 300, 400); // check diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index e1c3c9be5ab876..f285ad3f260ae4 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -270,6 +270,8 @@ enum TxnStatusPB { message TxnCoordinatorPB { optional TxnSourceTypePB sourceType = 1; optional string ip = 2; + optional int64 id = 3; + optional int64 start_time = 4; } message RoutineLoadProgressPB { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 556443a3d0957a..7c563a972720ce 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -614,6 +614,7 @@ struct TLoadTxnBeginRequest { 12: optional string token 13: optional string auth_code_uuid 14: optional i64 table_id + 15: optional i64 backend_id } struct TLoadTxnBeginResult { @@ -636,6 +637,7 @@ struct TBeginTxnRequest { 9: optional i64 timeout 10: optional Types.TUniqueId request_id 11: optional string token + 12: optional i64 backend_id } struct TBeginTxnResult { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 8b408bb7f2f044..9b5090bec928bf 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -902,6 +902,33 @@ class Suite implements GroovyInterceptable { return hdfs.downLoad(label) } + void runStreamLoadExample(String tableName, String coordidateBeHostPort = "") { + def backends = sql_return_maparray "show backends" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id int, + name varchar(255) + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "${backends.size()}" + ) + """ + + streamLoad { + table tableName + set 'column_separator', ',' + file context.config.dataPath + "/demo_p0/streamload_input.csv" + time 10000 + if (!coordidateBeHostPort.equals("")) { + def pos = coordidateBeHostPort.indexOf(':') + def host = coordidateBeHostPort.substring(0, pos) + def httpPort = coordidateBeHostPort.substring(pos + 1).toInteger() + directToBe host, httpPort + } + } + } + void streamLoad(Closure actionSupplier) { runAction(new StreamLoadAction(context), actionSupplier) } diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy b/regression-test/suites/demo_p0/streamLoad_action.groovy index 733483517d02b4..59d12c965a2636 100644 --- a/regression-test/suites/demo_p0/streamLoad_action.groovy +++ b/regression-test/suites/demo_p0/streamLoad_action.groovy @@ -126,6 +126,11 @@ suite("streamLoad_action") { LIMIT 5; """ + def tableName2 = "test_streamload_action2" + runStreamLoadExample(tableName2) + sql """ DROP TABLE ${tableName} """ + sql """ DROP TABLE ${tableName2}""" + sql """ DROP TABLE B """ } diff --git a/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy new file mode 100644 index 00000000000000..bb6b0c18a0daf7 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_coordidator_be_restart') { + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + + docker(options) { + def db = context.config.getDbNameByFile(context.file) + def tableName1 = 'tbl_test_coordidator_be_restart_t1' + setFeConfig('abort_txn_after_lost_heartbeat_time_second', 3600) + + def dbId = getDbId() + + def tableName2 = 'tbl_test_coordidator_be_restart_t2' + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + id int, + name CHAR(10), + dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP, + dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP, + dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP, + dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + """ + + def txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + assertEquals(0, txns.size()) + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + assertEquals(0, txns.size()) + + def coordinatorBe = cluster.getAllBackends().get(0) + def coordinatorBeHost = coordinatorBe.host + + GetDebugPoint().enableDebugPointForAllFEs('LoadAction.selectRedirectBackend.backendId', [value: coordinatorBe.backendId]) + GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block') + + thread { + try { + runStreamLoadExample(tableName1, coordinatorBe.host + ':' + coordinatorBe.httpPort) + } catch (NoHttpResponseException t) { + // be down will raise NoHttpResponseException + } + } + + thread { + try { + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${tableName2} (id, name) select c1, c2 from http_stream("format"="csv") + """ + time 120 * 1000 + file context.config.dataPath + '/load_p0/http_stream/test_http_stream.csv' + } + } catch (Exception e) { + logger.info('http stream: ' + e) + } + } + + sleep(5000) + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(2, txns.size()) + for (def txn : txns) { + assertEquals('PREPARE', txn.TransactionStatus) + } + + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + assertEquals(0, txns.size()) + + // coordinatorBe shutdown not abort txn because abort_txn_after_lost_heartbeat_time_second = 3600 + cluster.stopBackends(coordinatorBe.index) + def isDead = false + for (def i = 0; i < 10; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (!be.Alive.toBoolean()) { + isDead = true + break + } + sleep 1000 + } + assertTrue(isDead) + sleep 5000 + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(2, txns.size()) + for (def txn : txns) { + assertEquals('PREPARE', txn.TransactionStatus) + } + + // coordinatorBe restart, abort txn on it + cluster.startBackends(coordinatorBe.index) + def isAlive = false + for (def i = 0; i < 20; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (be.Alive.toBoolean()) { + isAlive = true + break + } + sleep 1000 + } + assertTrue(isAlive) + sleep 5000 + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(0, txns.size()) + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + logger.info('finished txns: ' + txns) + assertEquals(2, txns.size()) + for (def txn : txns) { + assertEquals('ABORTED', txn.TransactionStatus) + } + } +}