Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms",
"get_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", "reset_rl_progress");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");

BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job");
Expand Down
1 change: 1 addition & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;

Expand Down
10 changes: 10 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ class MetaServiceImpl : public cloud::MetaService {
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;

void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request, ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override;

void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure* done) override;

Expand Down Expand Up @@ -616,6 +620,12 @@ class MetaServiceProxy final : public MetaService {
done);
}

void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request, ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::reset_rl_progress, controller, request, response, done);
}

void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::get_txn_id, controller, request, response, done);
Expand Down
52 changes: 52 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,58 @@ void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
}
}

void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(reset_rl_progress);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(reset_rl_progress)

std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, err=" << err;
msg = ss.str();
return;
}

if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}

int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string rl_progress_key;
std::string rl_progress_val;
RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
txn->remove(rl_progress_key);
err = txn->commit();
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND;
ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err;
msg = ss.str();
return;
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to remove progress info, db_id=" << db_id << " job_id=" << job_id
<< " err=" << err;
msg = ss.str();
return;
}
}

void scan_tmp_rowset(
const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv>& txn_kv,
MetaServiceCode& code, std::string& msg, int64_t* db_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,16 @@ public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) {
return blockingStub.getRlTaskCommitAttach(request);
}

public Cloud. ResetRLProgressResponse resetRLProgress(Cloud. ResetRLProgressRequest request) {
if (!request.hasCloudUniqueId()) {
Cloud. ResetRLProgressRequest.Builder builder =
Cloud. ResetRLProgressRequest.newBuilder();
builder.mergeFrom(request);
return blockingStub.resetRlProgress(builder.setCloudUniqueId(Config.cloud_unique_id).build());
}
return blockingStub.resetRlProgress(request);
}

public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) {
if (!request.hasCloudUniqueId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,16 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo
}
}

public Cloud.ResetRLProgressResponse resetRLProgress(Cloud.ResetRLProgressRequest request)
throws RpcException {
try {
final MetaServiceClient client = getProxy();
return client.resetRLProgress(request);
} catch (Exception e) {
throw new RpcException("", e.getMessage(), e);
}
}

public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws RpcException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,17 @@ private void getReadableProgress(ConcurrentMap<Integer, String> showPartitionIdT
}
}

// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
public void checkPartitions(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (!partitionIdToOffset.containsKey(pair.first)) {
throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions");
}
}
}

// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
partitionIdToOffset.put(pair.first, pair.second);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,22 +713,35 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
customKafkaProperties = dataSourceProperties.getCustomKafkaProperties();
}

// modify partition offset first
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

// convertCustomProperties and check partitions before reset progress to make modify operation atomic
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
// modify broker list and topic
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
this.brokerList = dataSourceProperties.getBrokerList();

if (!kafkaPartitionOffsets.isEmpty()) {
((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets);
}

// It is necessary to reset the Kafka progress cache if topic change,
// and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
if (Config.isCloudMode()) {
resetCloudProgress();
}
this.topic = dataSourceProperties.getTopic();
this.progress = new KafkaProgress();
}

// modify partition offset
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

// modify broker list
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
this.brokerList = dataSourceProperties.getBrokerList();
}
}
if (!jobProperties.isEmpty()) {
Expand All @@ -743,6 +756,31 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
this.id, jobProperties, dataSourceProperties);
}

private void resetCloudProgress() throws DdlException {
Cloud.ResetRLProgressRequest.Builder builder =
Cloud.ResetRLProgressRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
builder.setDbId(dbId);
builder.setJobId(id);

Cloud.ResetRLProgressResponse response;
try {
response = MetaServiceProxy.getInstance().resetRLProgress(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("failed to reset cloud progress, response: {}", response);
if (response.getStatus().getCode() == Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) {
LOG.warn("not found routine load progress, response: {}", response);
return;
} else {
throw new DdlException(response.getStatus().getMsg());
}
}
} catch (RpcException e) {
LOG.info("failed to reset cloud progress {}", e);
throw new DdlException(e.getMessage());
}
}

@Override
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
Expand Down
11 changes: 11 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,16 @@ message GetRLTaskCommitAttachResponse {
optional RLTaskTxnCommitAttachmentPB commit_attach = 2;
}

message ResetRLProgressRequest {
optional string cloud_unique_id = 1; // For auth
optional int64 db_id = 2;
optional int64 job_id = 3;
}

message ResetRLProgressResponse {
optional MetaServiceResponseStatus status = 1;
}

message CheckKeyInfos {
repeated int64 db_ids = 1;
repeated int64 table_ids = 2;
Expand Down Expand Up @@ -1513,6 +1523,7 @@ service MetaService {

// routine load progress
rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse);
rpc reset_rl_progress(ResetRLProgressRequest) returns (ResetRLProgressResponse);

// check KV
rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql_topic_change --
1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"

-- !sql_topic_change1 --
1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
6 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
Loading