From e41cbc644f9e54906c8a7a1a8d02c1044ffb8e92 Mon Sep 17 00:00:00 2001 From: laihui Date: Wed, 17 Sep 2025 15:13:37 +0800 Subject: [PATCH] fix register callback id invalid --- .../commands/insert/OlapInsertExecutor.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index f41d8893f020e3..b7141dd9ed0928 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -99,26 +99,16 @@ public void beginTransaction() { throw new BeginTransactionException("current running txns on db is larger than limit"); } this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - database.getId(), ImmutableList.of(table.getId()), labelName, null, + database.getId(), ImmutableList.of(table.getId()), labelName, new TxnCoordinator(TxnSourceType.FE, 0, FrontendOptions.getLocalHostAddress(), ExecuteEnv.getInstance().getStartupTime()), - LoadJobSourceType.INSERT_STREAMING, getListenerId(), ctx.getExecTimeoutS()); + LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS()); } catch (Exception e) { throw new AnalysisException("begin transaction failed. " + e.getMessage(), e); } } - private long getListenerId() { - long listenerId = -1; - StreamingInsertTask streamingInsertTask = - Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId); - if (streamingInsertTask != null) { - listenerId = streamingInsertTask.getJobId(); - } - return listenerId; - } - @Override public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { OlapTableSink olapTableSink = (OlapTableSink) sink; @@ -199,6 +189,7 @@ protected void beforeExec() { @Override protected void onComplete() throws UserException { + setTxnCallbackId(); if (ctx.getState().getStateType() == MysqlStateType.ERR) { try { String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); @@ -234,6 +225,18 @@ protected void onComplete() throws UserException { } } + private void setTxnCallbackId() { + TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId); + if (state == null) { + throw new AnalysisException("txn does not exist: " + txnId); + } + StreamingInsertTask streamingInsertTask = + Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId); + if (streamingInsertTask != null) { + state.setCallbackId(streamingInsertTask.getJobId()); + } + } + @Override protected void onFail(Throwable t) { errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();