Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms", "get_current_m
BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", "check_txn_conflict");
BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator("ms", "abort_txn_with_coordinator");
BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label("ms", "clean_txn_label");
BvarLatencyRecorderWithTag g_bvar_ms_get_version("ms", "get_version");
BvarLatencyRecorderWithTag g_bvar_ms_batch_get_version("ms", "batch_get_version");
Expand Down
1 change: 1 addition & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict;
extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator;
extern BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label;
Expand Down
13 changes: 13 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ class MetaServiceImpl : public cloud::MetaService {
CheckTxnConflictResponse* response,
::google::protobuf::Closure* done) override;

void abort_txn_with_coordinator(::google::protobuf::RpcController* controller,
const AbortTxnWithCoordinatorRequest* request,
AbortTxnWithCoordinatorResponse* response,
::google::protobuf::Closure* done) override;

void clean_txn_label(::google::protobuf::RpcController* controller,
const CleanTxnLabelRequest* request, CleanTxnLabelResponse* response,
::google::protobuf::Closure* done) override;
Expand Down Expand Up @@ -351,6 +356,14 @@ class MetaServiceProxy final : public MetaService {
call_impl(&cloud::MetaService::check_txn_conflict, controller, request, response, done);
}

void abort_txn_with_coordinator(::google::protobuf::RpcController* controller,
const AbortTxnWithCoordinatorRequest* request,
AbortTxnWithCoordinatorResponse* response,
::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::abort_txn_with_coordinator, controller, request, response,
done);
}

void clean_txn_label(::google::protobuf::RpcController* controller,
const CleanTxnLabelRequest* request, CleanTxnLabelResponse* response,
::google::protobuf::Closure* done) override {
Expand Down
297 changes: 224 additions & 73 deletions cloud/src/meta-service/meta_service_txn.cpp

Large diffs are not rendered by default.

74 changes: 74 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,77 @@ TEST(MetaServiceTest, GetCurrentMaxTxnIdTest) {
ASSERT_GE(max_txn_id_res.current_max_txn_id(), begin_txn_res.txn_id());
}

TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) {
auto meta_service = get_meta_service();

const int64_t db_id = 666;
const int64_t table_id = 777;
const std::string label = "test_label";
const std::string cloud_unique_id = "test_cloud_unique_id";
const int64_t coordinator_id = 15623;
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
std::string host = "127.0.0.1:15586";
int64_t txn_id = -1;

brpc::Controller begin_txn_cntl;
BeginTxnRequest begin_txn_req;
BeginTxnResponse begin_txn_res;
TxnInfoPB txn_info_pb;
TxnCoordinatorPB coordinator;

begin_txn_req.set_cloud_unique_id(cloud_unique_id);
txn_info_pb.set_db_id(db_id);
txn_info_pb.set_label(label);
txn_info_pb.add_table_ids(table_id);
txn_info_pb.set_timeout_ms(36000);
coordinator.set_id(coordinator_id);
coordinator.set_ip(host);
coordinator.set_sourcetype(::doris::cloud::TxnSourceTypePB::TXN_SOURCE_TYPE_BE);
coordinator.set_start_time(cur_time);
txn_info_pb.mutable_coordinator()->CopyFrom(coordinator);
begin_txn_req.mutable_txn_info()->CopyFrom(txn_info_pb);

meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl),
&begin_txn_req, &begin_txn_res, nullptr);
ASSERT_EQ(begin_txn_res.status().code(), MetaServiceCode::OK);
txn_id = begin_txn_res.txn_id();
ASSERT_GT(txn_id, -1);

brpc::Controller abort_txn_cntl;
AbortTxnWithCoordinatorRequest abort_txn_req;
AbortTxnWithCoordinatorResponse abort_txn_resp;

abort_txn_req.set_id(coordinator_id);
abort_txn_req.set_ip(host);
abort_txn_req.set_start_time(cur_time + 3600);

// first time to check txn conflict
meta_service->abort_txn_with_coordinator(
reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), &abort_txn_req,
&abort_txn_resp, nullptr);
ASSERT_EQ(abort_txn_resp.status().code(), MetaServiceCode::OK);

brpc::Controller abort_txn_conflict_cntl;
CheckTxnConflictRequest check_txn_conflict_req;
CheckTxnConflictResponse check_txn_conflict_res;

check_txn_conflict_req.set_cloud_unique_id(cloud_unique_id);
check_txn_conflict_req.set_db_id(db_id);
check_txn_conflict_req.set_end_txn_id(txn_id + 1);
check_txn_conflict_req.add_table_ids(table_id);

// first time to check txn conflict
meta_service->check_txn_conflict(
reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl),
&check_txn_conflict_req, &check_txn_conflict_res, nullptr);

ASSERT_EQ(check_txn_conflict_res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(check_txn_conflict_res.finished(), true);
ASSERT_EQ(check_txn_conflict_res.conflict_txns_size(), 0);
}

TEST(MetaServiceTest, CheckTxnConflictTest) {
auto meta_service = get_meta_service();

Expand Down Expand Up @@ -2097,6 +2168,8 @@ TEST(MetaServiceTest, CheckTxnConflictTest) {

ASSERT_EQ(check_txn_conflict_res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(check_txn_conflict_res.finished(), false);
ASSERT_EQ(check_txn_conflict_res.conflict_txns_size(), 1);
check_txn_conflict_res.clear_conflict_txns();

// mock rowset and tablet
int64_t tablet_id_base = 123456;
Expand Down Expand Up @@ -2125,6 +2198,7 @@ TEST(MetaServiceTest, CheckTxnConflictTest) {

ASSERT_EQ(check_txn_conflict_res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(check_txn_conflict_res.finished(), true);
ASSERT_EQ(check_txn_conflict_res.conflict_txns_size(), 0);

{
std::string running_key = txn_running_key({mock_instance, db_id, txn_id});
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2714,6 +2714,16 @@ public class Config extends ConfigBase {
@ConfField
public static long spilled_profile_storage_limit_bytes = 1 * 1024 * 1024 * 1024; // 1GB

@ConfField(mutable = true, description = {
"是否通过检测协调者BE心跳来 abort 事务",
"SHould abort txn by checking coorinator be heartbeat"})
public static boolean enable_abort_txn_by_checking_coordinator_be = true;

@ConfField(mutable = true, description = {
"是否在 schema change 过程中, 检测冲突事物并 abort 它",
"SHould abort txn by checking conflick txn in schema change"})
public static boolean enable_abort_txn_by_checking_conflict_txn = true;

//==========================================================================
// begin of cloud config
//==========================================================================
Expand Down
24 changes: 19 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.SqlParserUtils;
Expand All @@ -67,6 +69,8 @@
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletType;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -373,11 +377,11 @@ protected void runWaitingTxnJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.WAITING_TXN, jobState);

try {
if (!isPreviousLoadFinished()) {
if (!checkFailedPreviousLoadAndAbort()) {
LOG.info("wait transactions before {} to be finished, rollup job: {}", watershedTxnId, jobId);
return;
}
} catch (AnalysisException e) {
} catch (UserException e) {
throw new AlterCancelException(e.getMessage());
}

Expand Down Expand Up @@ -681,10 +685,20 @@ private void cancelInternal() {
}
}

// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() throws AnalysisException {
return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are failed
// and abort it if it is failed.
// If return true, all previous load is finish
protected boolean checkFailedPreviousLoadAndAbort() throws UserException {
List<TransactionState> unFinishedTxns = Env.getCurrentGlobalTransactionMgr().getUnFinishedPreviousLoad(
watershedTxnId, dbId, Lists.newArrayList(tableId));
if (Config.enable_abort_txn_by_checking_conflict_txn) {
List<TransactionState> failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns);
for (TransactionState txn : failedTxns) {
Env.getCurrentGlobalTransactionMgr()
.abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change");
}
}
return unFinishedTxns.isEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.TimeUtils;
Expand All @@ -58,6 +60,8 @@
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -418,11 +422,11 @@ protected void addShadowIndexToCatalog(OlapTable tbl) {
protected void runWaitingTxnJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.WAITING_TXN, jobState);
try {
if (!isPreviousLoadFinished()) {
if (!checkFailedPreviousLoadAndAbort()) {
LOG.info("wait transactions before {} to be finished, schema change job: {}", watershedTxnId, jobId);
return;
}
} catch (AnalysisException e) {
} catch (UserException e) {
throw new AlterCancelException(e.getMessage());
}

Expand Down Expand Up @@ -792,10 +796,20 @@ private void cancelInternal() {
jobState = JobState.CANCELLED;
}

// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() throws AnalysisException {
return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished
// and abort it if it is failed.
// If return true, all previous load is finish
protected boolean checkFailedPreviousLoadAndAbort() throws UserException {
List<TransactionState> unFinishedTxns = Env.getCurrentGlobalTransactionMgr().getUnFinishedPreviousLoad(
watershedTxnId, dbId, Lists.newArrayList(tableId));
if (Config.enable_abort_txn_by_checking_conflict_txn) {
List<TransactionState> failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns);
for (TransactionState txn : failedTxns) {
Env.getCurrentGlobalTransactionMgr()
.abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change");
}
}
return unFinishedTxns.isEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,15 @@ public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) {
}
return blockingStub.getObjStoreInfo(request);
}

public Cloud.AbortTxnWithCoordinatorResponse
abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest request) {
if (!request.hasCloudUniqueId()) {
Cloud.AbortTxnWithCoordinatorRequest.Builder builder =
Cloud.AbortTxnWithCoordinatorRequest.newBuilder();
builder.mergeFrom(request);
return blockingStub.abortTxnWithCoordinator(builder.setCloudUniqueId(Config.cloud_unique_id).build());
}
return blockingStub.abortTxnWithCoordinator(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,4 +507,14 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo
throw new RpcException("", e.getMessage(), e);
}
}

public Cloud.AbortTxnWithCoordinatorResponse
abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest request) throws RpcException {
try {
final MetaServiceClient client = getProxy();
return client.abortTxnWithCoordinator(request);
} catch (Exception e) {
throw new RpcException("", e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse;
import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest;
import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse;
import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorRequest;
import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorResponse;
import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest;
import org.apache.doris.cloud.proto.Cloud.BeginSubTxnResponse;
import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest;
Expand Down Expand Up @@ -1086,6 +1088,42 @@ public void finishTransaction(long dbId, long transactionId, Map<Long, Long> par
throw new UserException("Disallow to call finishTransaction()");
}

public List<TransactionState> getUnFinishedPreviousLoad(long endTransactionId, long dbId, List<Long> tableIdList)
throws UserException {
LOG.info("getUnFinishedPreviousLoad(), endTransactionId:{}, dbId:{}, tableIdList:{}",
endTransactionId, dbId, tableIdList);

if (endTransactionId <= 0) {
throw new UserException("Invaid endTransactionId:" + endTransactionId);
}
CheckTxnConflictRequest.Builder builder = CheckTxnConflictRequest.newBuilder();
builder.setDbId(dbId);
builder.setEndTxnId(endTransactionId);
builder.addAllTableIds(tableIdList);
builder.setCloudUniqueId(Config.cloud_unique_id);

final CheckTxnConflictRequest checkTxnConflictRequest = builder.build();
CheckTxnConflictResponse checkTxnConflictResponse = null;
try {
LOG.info("CheckTxnConflictRequest:{}", checkTxnConflictRequest);
checkTxnConflictResponse = MetaServiceProxy
.getInstance().checkTxnConflict(checkTxnConflictRequest);
LOG.info("CheckTxnConflictResponse: {}", checkTxnConflictResponse);
} catch (RpcException e) {
throw new UserException(e.getMessage());
}

if (checkTxnConflictResponse.getStatus().getCode() != MetaServiceCode.OK) {
throw new UserException(checkTxnConflictResponse.getStatus().getMsg());
}
List<TxnInfoPB> conflictTxnInfoPbs = checkTxnConflictResponse.getConflictTxnsList();
List<TransactionState> conflictTxns = new ArrayList<>();
for (TxnInfoPB infoPb : conflictTxnInfoPbs) {
conflictTxns.add(TxnUtil.transactionStateFromPb(infoPb));
}
return conflictTxns;
}

@Override
public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List<Long> tableIdList)
throws AnalysisException {
Expand Down Expand Up @@ -1258,7 +1296,19 @@ public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) thro

@Override
public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) {
// do nothing in cloud mode
AbortTxnWithCoordinatorRequest.Builder builder = AbortTxnWithCoordinatorRequest.newBuilder();
builder.setIp(coordinateHost);
builder.setId(coordinateBeId);
builder.setStartTime(beStartTime);
final AbortTxnWithCoordinatorRequest request = builder.build();
AbortTxnWithCoordinatorResponse response = null;
try {
response = MetaServiceProxy
.getInstance().abortTxnWithCoordinator(request);
LOG.info("AbortTxnWithCoordinatorResponse: {}", response);
} catch (RpcException e) {
LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
boolean isChanged = be.handleHbResponse(hbResponse, isReplay);
if (hbResponse.getStatus() == HbStatus.OK) {
long newStartTime = be.getLastStartTime();
if (!isReplay && oldStartTime != newStartTime) {
if (!isReplay && Config.enable_abort_txn_by_checking_coordinator_be
&& oldStartTime != newStartTime) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
be.getId(), be.getHost(), newStartTime);
}
Expand Down
Loading