Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "thrift/protocol/TDebugProtocol.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
Expand Down Expand Up @@ -174,6 +175,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
request.__set_timeout(ctx->timeout_second);
}
request.__set_request_id(ctx->id.to_thrift());
request.__set_backend_id(_exec_env->master_info()->backend_id);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

httpstream shoud also be considered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had fix httpstream

TLoadTxnBeginResult result;
Status status;
Expand Down Expand Up @@ -309,6 +311,8 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
}

Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);

DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1);

TLoadTxnCommitRequest request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -406,7 +407,9 @@ public void analyze(Analyzer analyzer) throws UserException {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()), label.getLabelName(),
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
}
isTransactionBegin = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,12 @@ public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) thro
}

@Override
public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) {
public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) {
// do nothing in cloud mode
}

@Override
public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit) {
// do nothing in cloud mode
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,15 @@ public static LoadJobFinalOperation loadJobFinalOperationFromPb(TxnCommitAttachm
public static TxnCoordinatorPB txnCoordinatorToPb(TxnCoordinator txnCoordinator) {
TxnCoordinatorPB.Builder builder = TxnCoordinatorPB.newBuilder();
builder.setSourceType(TxnSourceTypePB.forNumber(txnCoordinator.sourceType.value()));
builder.setId(txnCoordinator.id);
builder.setIp(txnCoordinator.ip);
builder.setStartTime(txnCoordinator.startTime);
return builder.build();
}

public static TxnCoordinator txnCoordinatorFromPb(TxnCoordinatorPB txnCoordinatorPB) {
TxnCoordinator txnCoordinator = new TxnCoordinator();
txnCoordinator.sourceType = TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber());
txnCoordinator.ip = txnCoordinatorPB.getIp();
return txnCoordinator;
return new TxnCoordinator(TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber()),
txnCoordinatorPB.getId(), txnCoordinatorPB.getIp(), txnCoordinatorPB.getStartTime());
}

public static TransactionState transactionStateFromPb(TxnInfoPB txnInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
Expand Down Expand Up @@ -353,6 +354,11 @@ private String getCloudClusterName(HttpServletRequest request) {

private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit)
throws LoadException {
long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L);
if (debugBackendId != -1L) {
Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId);
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
if (Config.isCloudMode()) {
String cloudClusterName = getCloudClusterName(request);
if (Strings.isNullOrEmpty(cloudClusterName)) {
Expand Down
6 changes: 4 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.doris.planner.PartitionPruner;
import org.apache.doris.planner.RangePartitionPrunerV2;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
Expand Down Expand Up @@ -284,8 +285,9 @@ public void setCountDownLatch(MarkedCountDownLatch<Long, Long> countDownLatch) {
public long beginTxn() throws Exception {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(),
Lists.newArrayList(deleteInfo.getTableId()), label, null,
new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.FRONTEND, id, Config.stream_load_default_timeout_second);
this.transactionId = txnId;
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -123,7 +124,9 @@ public void beginTxn()
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
getTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
Expand Down Expand Up @@ -199,7 +200,9 @@ public void beginTxn()
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.FRONTEND, id, getTimeout());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
Expand Down Expand Up @@ -199,7 +200,9 @@ public boolean beginTxn() throws UserException {
try {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.load.sync.model.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
Expand Down Expand Up @@ -132,8 +133,10 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.doris.qe.MasterTxnExecutor;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
Expand Down Expand Up @@ -193,9 +194,10 @@ private static void beginBatchInsertTransaction(ConnectContext ctx,
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
} else {
MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TOlapTableLocationParam;
Expand Down Expand Up @@ -96,7 +97,9 @@ public void beginTransaction() {
try {
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " + e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
Expand Down Expand Up @@ -2197,9 +2198,10 @@ private void beginTxn(String dbName, String tblName) throws UserException, TExce
String label = txnEntry.getLabel();
if (Env.getCurrentEnv().isMaster()) {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,9 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, clientIp);
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
if (request.isSetToken()) {
txnCoord.isFromInternal = true;
}
Expand Down Expand Up @@ -1325,10 +1327,12 @@ private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String clientIp)
// step 5: get timeout
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;

Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
// step 6: begin transaction
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), tableIdList, request.getLabel(), request.getRequestId(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);

// step 7: return result
Expand Down Expand Up @@ -2105,6 +2109,25 @@ private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResu
httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode);
httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode);
httpStreamParams.getParams().setNumLocalSink(1);

TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(
httpStreamParams.getDb().getId(), httpStreamParams.getTxnId());
if (txnState == null) {
LOG.warn("Not found http stream related txn, txn id = {}", httpStreamParams.getTxnId());
} else {
TxnCoordinator txnCoord = txnState.getCoordinator();
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
if (backend != null) {
// only modify txnCoord in memory, not write editlog yet.
txnCoord.sourceType = TxnSourceType.BE;
txnCoord.id = backend.getId();
txnCoord.ip = backend.getHost();
txnCoord.startTime = backend.getLastStartTime();
LOG.info("Change http stream related txn {} to coordinator {}",
httpStreamParams.getTxnId(), txnCoord);
}
}

result.setPipelineParams(httpStreamParams.getParams());
result.getPipelineParams().setDbName(httpStreamParams.getDb().getFullName());
result.getPipelineParams().setTableName(httpStreamParams.getTable().getName());
Expand Down
13 changes: 10 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,21 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
BackendHbResponse hbResponse = (BackendHbResponse) response;
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
if (be != null) {
long oldStartTime = be.getLastStartTime();
boolean isChanged = be.handleHbResponse(hbResponse, isReplay);
if (hbResponse.getStatus() != HbStatus.OK) {
if (hbResponse.getStatus() == HbStatus.OK) {
long newStartTime = be.getLastStartTime();
if (!isReplay && oldStartTime != newStartTime) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
be.getId(), be.getHost(), newStartTime);
}
} else {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort()));
if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs()
>= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
Env.getCurrentGlobalTransactionMgr()
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
be.getId(), be.getHost(), 100);
}
}
return isChanged;
Expand Down
Loading