Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) {
request.__set_max_filter_ratio(ctx->max_filter_ratio);
}
request.__set_create_timestamp(UnixMillis());
request.__set_request_id(ctx->id.to_thrift());
// begin load by master
const TNetworkAddress& master_addr = _exec_env->master_info()->network_address;
TMiniLoadBeginResult res;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
if (ctx->timeout_second != -1) {
request.__set_timeout(ctx->timeout_second);
}
request.__set_request_id(ctx->id.to_thrift());

TLoadTxnBeginResult result;
#ifndef BE_TEST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public Set<String> getTableNames() throws MetaNotFoundException{
@Override
public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(),
.beginTransaction(dbId, label, null, "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
timeoutSecond);
}
Expand Down
7 changes: 7 additions & 0 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements

protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

// this request id is only used for checking if a load begin request is a duplicate request.
protected TUniqueId requestId;

// only for log replay
public LoadJob() {
}
Expand Down Expand Up @@ -191,6 +194,10 @@ public long getTransactionId() {
return transactionId;
}

public TUniqueId getRequestId() {
return requestId;
}

/**
* Show table names for frontend
* If table name could not be found by id, the table id will be used instead.
Expand Down
19 changes: 10 additions & 9 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadBeginRequest;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void createLoadJobFromStmt(LoadStmt stmt, String originStmt) throws DdlEx
LoadJob loadJob = null;
writeLock();
try {
checkLabelUsed(dbId, stmt.getLabel().getLabelName(), -1);
checkLabelUsed(dbId, stmt.getLabel().getLabelName(), null);
if (stmt.getBrokerDesc() == null) {
throw new DdlException("LoadManager only support the broker load.");
}
Expand Down Expand Up @@ -134,7 +135,7 @@ public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws User
LoadJob loadJob = null;
writeLock();
try {
checkLabelUsed(database.getId(), request.getLabel(), request.getCreate_timestamp());
checkLabelUsed(database.getId(), request.getLabel(), request.getRequest_id());
loadJob = new MiniLoadJob(database.getId(), request);
createLoadJob(loadJob);
// Mini load job must be executed before release write lock.
Expand Down Expand Up @@ -184,7 +185,7 @@ public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long time
Database database = checkDb(stmt.getLabel().getDbName());
writeLock();
try {
checkLabelUsed(database.getId(), stmt.getLabel().getLabelName(), -1);
checkLabelUsed(database.getId(), stmt.getLabel().getLabelName(), null);
Catalog.getCurrentCatalog().getLoadInstance().addLoadJob(stmt, jobType, timestamp);
} finally {
writeUnlock();
Expand All @@ -209,7 +210,7 @@ public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlEx
Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
writeLock();
try {
checkLabelUsed(database.getId(), request.getLabel(), -1);
checkLabelUsed(database.getId(), request.getLabel(), null);
return Catalog.getCurrentCatalog().getLoadInstance().addLoadJob(request);
} finally {
writeUnlock();
Expand All @@ -220,7 +221,7 @@ public void createLoadJobV1FromMultiStart(String fullDbName, String label) throw
Database database = checkDb(fullDbName);
writeLock();
try {
checkLabelUsed(database.getId(), label, -1);
checkLabelUsed(database.getId(), label, null);
Catalog.getCurrentCatalog().getLoadInstance()
.registerMiniLabel(fullDbName, label, System.currentTimeMillis());
} finally {
Expand Down Expand Up @@ -501,10 +502,10 @@ private void checkTable(Database database, String tableName) throws DdlException
*
* @param dbId
* @param label
* @param createTimestamp the create timestamp of stmt of request
* @param requestId: the uuid of each txn request from BE
* @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job.
*/
private void checkLabelUsed(long dbId, String label, long createTimestamp)
private void checkLabelUsed(long dbId, String label, TUniqueId requestId)
throws DdlException {
// if label has been used in old load jobs
Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label);
Expand All @@ -517,9 +518,9 @@ private void checkLabelUsed(long dbId, String label, long createTimestamp)
labelLoadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).findFirst();
if (loadJobOptional.isPresent()) {
LoadJob loadJob = loadJobOptional.get();
if (loadJob.getCreateTimestamp() == createTimestamp) {
if (loadJob.getRequestId() != null && requestId != null && loadJob.getRequestId().equals(requestId)) {
throw new DuplicatedRequestException(String.valueOf(loadJob.getId()),
"The request is duplicated with " + loadJob.getId());
"The request is duplicated with " + loadJob.getId());
}
LOG.warn("Failed to add load job when label {} has been used.", label);
throw new LabelAlreadyUsedException(label);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFound
this.createTimestamp = request.getCreate_timestamp();
this.loadStartTimestamp = createTimestamp;
this.authorizationInfo = gatherAuthInfo();
this.requestId = request.getRequest_id();
}

@Override
Expand All @@ -87,7 +88,7 @@ public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
@Override
public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(),
.beginTransaction(dbId, label, null, "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
timeoutSecond);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
routineLoadJob.getDbId(), DebugUtil.printId(id), -1, "FE: " + FrontendOptions.getLocalHostAddress(),
routineLoadJob.getDbId(), DebugUtil.printId(id), null, "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(),
routineLoadJob.getMaxBatchIntervalS() * 2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,10 +646,9 @@ private long loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) thr
}

// begin
long timestamp = request.isSetTimestamp() ? request.getTimestamp() : -1;
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
return Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), request.getLabel(), timestamp, "BE: " + clientIp,
db.getId(), request.getLabel(), request.getRequest_id(), "BE: " + clientIp,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;

import com.google.common.base.Joiner;
Expand Down Expand Up @@ -107,22 +108,22 @@ public TxnStateCallbackFactory getCallbackFactory() {

public long beginTransaction(long dbId, String label, String coordinator, LoadJobSourceType sourceType,
long timeoutSecond) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException {
return beginTransaction(dbId, label, -1, coordinator, sourceType, -1, timeoutSecond);
return beginTransaction(dbId, label, null, coordinator, sourceType, -1, timeoutSecond);
}

/**
* the app could specify the transaction id
*
* timestamp is used to judge that whether the request is a internal retry request
* if label already exist, and timestamp are equal, we return the exist tid, and consider this 'begin'
* requestId is used to judge that whether the request is a internal retry request
* if label already exist, and requestId are equal, we return the exist tid, and consider this 'begin'
* as success.
* timestamp == -1 is for compatibility
* requestId == null is for compatibility
*
* @param coordinator
* @throws BeginTransactionException
* @throws IllegalTransactionParameterException
*/
public long beginTransaction(long dbId, String label, long timestamp,
public long beginTransaction(long dbId, String label, TUniqueId requestId,
String coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond)
throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException {

Expand All @@ -145,10 +146,11 @@ public long beginTransaction(long dbId, String label, long timestamp,
Map<String, Long> txnLabels = dbIdToTxnLabels.row(dbId);
if (txnLabels != null && txnLabels.containsKey(label)) {
// check timestamp
if (timestamp != -1) {
if (requestId != null) {
TransactionState existTxn = getTransactionState(txnLabels.get(label));
if (existTxn != null && existTxn.getTransactionStatus() == TransactionStatus.PREPARE
&& existTxn.getTimestamp() == timestamp) {
&& existTxn.getRequsetId() != null && existTxn.getRequsetId().equals(requestId)) {
// this may be a retry request for same job, just return existing txn id.
return txnLabels.get(label);
}
}
Expand All @@ -161,7 +163,7 @@ public long beginTransaction(long dbId, String label, long timestamp,
}
long tid = idGenerator.getNextTransactionId();
LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator);
TransactionState transactionState = new TransactionState(dbId, tid, label, timestamp, sourceType,
TransactionState transactionState = new TransactionState(dbId, tid, label, requestId, sourceType,
coordinator, listenerId, timeoutSecond * 1000);
transactionState.setPrepareTime(System.currentTimeMillis());
unprotectUpsertTransactionState(transactionState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -109,9 +110,9 @@ public String toString() {
private long dbId;
private long transactionId;
private String label;
// timestamp is used to judge whether a begin request is a internal retry request.
// no need to persist it
private long timestamp;
// requsetId is used to judge whether a begin request is a internal retry request.
// no need to persist it.
private TUniqueId requsetId;
private Map<Long, TableCommitInfo> idToTableCommitInfos;
// coordinator is show who begin this txn (FE, or one of BE, etc...)
private String coordinator;
Expand Down Expand Up @@ -146,7 +147,6 @@ public TransactionState() {
this.dbId = -1;
this.transactionId = -1;
this.label = "";
this.timestamp = -1;
this.idToTableCommitInfos = Maps.newHashMap();
this.coordinator = "";
this.transactionStatus = TransactionStatus.PREPARE;
Expand All @@ -161,12 +161,12 @@ public TransactionState() {
this.latch = new CountDownLatch(1);
}

public TransactionState(long dbId, long transactionId, String label, long timestamp,
public TransactionState(long dbId, long transactionId, String label, TUniqueId requsetId,
LoadJobSourceType sourceType, String coordinator, long callbackId, long timeoutMs) {
this.dbId = dbId;
this.transactionId = transactionId;
this.label = label;
this.timestamp = timestamp;
this.requsetId = requsetId;
this.idToTableCommitInfos = Maps.newHashMap();
this.coordinator = coordinator;
this.transactionStatus = TransactionStatus.PREPARE;
Expand Down Expand Up @@ -215,9 +215,9 @@ public long getPublishVersionTime() {
public boolean hasSendTask() {
return this.hasSendTask;
}
public long getTimestamp() {
return timestamp;

public TUniqueId getRequsetId() {
return requsetId;
}

public long getTransactionId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.Load;
import org.apache.doris.metric.LongCounterMetric;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;
Expand Down Expand Up @@ -104,7 +104,8 @@ public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr,
LoadJob loadJob = new BrokerLoadJob();
new Expectations() {
{
globalTransactionMgr.beginTransaction(anyLong, anyString, anyLong, anyString, (TransactionState.LoadJobSourceType) any, anyLong, anyLong);
globalTransactionMgr.beginTransaction(anyLong, anyString, (TUniqueId) any, anyString,
(TransactionState.LoadJobSourceType) any, anyLong, anyLong);
result = 1;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.doris.transaction;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.CatalogTestUtil;
Expand Down Expand Up @@ -314,7 +317,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", partitionIdToOffset);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L,
TransactionState transactionState = new TransactionState(1L, 1L, "label", null,
LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
Expand Down Expand Up @@ -380,7 +383,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", partitionIdToOffset);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L,
TransactionState transactionState = new TransactionState(1L, 1L, "label", null,
LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
Expand Down
6 changes: 4 additions & 2 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ struct TMiniLoadBeginRequest {
9: optional i64 timeout_second
10: optional double max_filter_ratio
11: optional i64 auth_code
12: optional i64 create_timestamp;
12: optional i64 create_timestamp
13: optional Types.TUniqueId request_id
}

struct TIsMethodSupportedRequest {
Expand All @@ -481,10 +482,11 @@ struct TLoadTxnBeginRequest {
5: required string tbl
6: optional string user_ip
7: required string label
8: optional i64 timestamp
8: optional i64 timestamp // deprecated, use request_id instead
9: optional i64 auth_code
// The real value of timeout should be i32. i64 ensures the compatibility of interface.
10: optional i64 timeout
11: optional Types.TUniqueId request_id
}

struct TLoadTxnBeginResult {
Expand Down