Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap("ms", "remove_delete_b
BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock("ms", "remove_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms", "get_streaming_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", "reset_rl_progress");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job");
Expand Down Expand Up @@ -364,6 +365,8 @@ mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_counter("rpc_kv_precommit_txn_ge
mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter("rpc_kv_precommit_txn_put_counter",{"instance_id"});
// get_rl_task_commit_attach
mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_rl_task_commit_attach_get_counter",{"instance_id"});
// get_streaming_task_commit_attach
mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"});
Expand Down Expand Up @@ -527,6 +530,8 @@ mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_bytes("rpc_kv_precommit_txn_get_
mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes("rpc_kv_precommit_txn_put_bytes",{"instance_id"});
// get_rl_task_commit_attach
mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl_task_commit_attach_get_bytes",{"instance_id"});
// get_streaming_task_commit_attach
mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"});
Expand Down
3 changes: 3 additions & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
Expand Down Expand Up @@ -470,6 +471,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter;
Expand Down Expand Up @@ -582,6 +584,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes;
Expand Down
13 changes: 13 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ class MetaServiceImpl : public cloud::MetaService {
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;

void get_streaming_task_commit_attach(::google::protobuf::RpcController* controller,
const GetStreamingTaskCommitAttachRequest* request,
GetStreamingTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;

void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request, ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override;
Expand Down Expand Up @@ -820,6 +825,14 @@ class MetaServiceProxy final : public MetaService {
done);
}

void get_streaming_task_commit_attach(::google::protobuf::RpcController* controller,
const GetStreamingTaskCommitAttachRequest* request,
GetStreamingTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::get_streaming_task_commit_attach, controller, request,
response, done);
}

void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request, ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override {
Expand Down
137 changes: 137 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,75 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg,
<< " routine load new progress: " << new_progress_info.ShortDebugString();
}

void put_streaming_job_meta(MetaServiceCode& code, std::string& msg, const std::string& instance_id,
const CommitTxnRequest* request, Transaction* txn, int64_t db_id) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
if (!request->has_commit_attachment()) {
ss << "failed to get commit attachment from req, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
StreamingTaskCommitAttachmentPB commit_attachment =
txn_commit_attachment.streaming_task_txn_commit_attachment();
int64_t job_id = commit_attachment.job_id();

std::string streaming_meta_key;
std::string streaming_meta_val;
bool prev_meta_existed = true;
StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id, job_id};
streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
TxnErrorCode err = txn->get(streaming_meta_key, &streaming_meta_val);
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
prev_meta_existed = false;
} else {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get streaming job meta, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
return;
}
}

StreamingTaskCommitAttachmentPB new_meta_info;
if (prev_meta_existed) {
if (!new_meta_info.ParseFromString(streaming_meta_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse streaming job meta, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
new_meta_info.set_scanned_rows(new_meta_info.scanned_rows() +
commit_attachment.scanned_rows());
new_meta_info.set_load_bytes(new_meta_info.load_bytes() + commit_attachment.load_bytes());
new_meta_info.set_file_number(new_meta_info.file_number() +
commit_attachment.file_number());
new_meta_info.set_file_size(new_meta_info.file_size() + commit_attachment.file_size());
} else {
new_meta_info.set_job_id(commit_attachment.job_id());
new_meta_info.set_scanned_rows(commit_attachment.scanned_rows());
new_meta_info.set_load_bytes(commit_attachment.load_bytes());
new_meta_info.set_file_number(commit_attachment.file_number());
new_meta_info.set_file_size(commit_attachment.file_size());
}
if (commit_attachment.has_offset()) {
new_meta_info.set_offset(commit_attachment.offset());
}
std::string new_meta_val;
if (!new_meta_info.SerializeToString(&new_meta_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new streaming meta val, txn_id=" << txn_id;
msg = ss.str();
return;
}

txn->put(streaming_meta_key, new_meta_val);
LOG(INFO) << "put streaming_meta_key key=" << hex(streaming_meta_key)
<< " streaming job new meta: " << new_meta_info.ShortDebugString();
}

void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller,
const GetRLTaskCommitAttachRequest* request,
GetRLTaskCommitAttachResponse* response,
Expand Down Expand Up @@ -678,6 +747,64 @@ void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
}
}

void MetaServiceImpl::get_streaming_task_commit_attach(
::google::protobuf::RpcController* controller,
const GetStreamingTaskCommitAttachRequest* request,
GetStreamingTaskCommitAttachResponse* response, ::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_streaming_task_commit_attach, get);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(get_streaming_task_commit_attach)

TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, err=" << err;
msg = ss.str();
return;
}

if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}

int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string streaming_meta_key;
std::string streaming_meta_val;
StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id, job_id};
streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
err = txn->get(streaming_meta_key, &streaming_meta_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND;
ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err;
msg = ss.str();
return;
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get progress info, db_id=" << db_id << " job_id=" << job_id
<< " err=" << err;
msg = ss.str();
return;
}

StreamingTaskCommitAttachmentPB* commit_attach = response->mutable_commit_attach();
if (!commit_attach->ParseFromString(streaming_meta_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse meta info, db_id=" << db_id << " job_id=" << job_id;
msg = ss.str();
return;
}
}

void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
Expand Down Expand Up @@ -1572,6 +1699,11 @@ void MetaServiceImpl::commit_txn_immediately(
put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
}

if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
put_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id);
}

LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
<< " txn_id=" << txn_id;
LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
Expand Down Expand Up @@ -1965,6 +2097,11 @@ void MetaServiceImpl::commit_txn_eventually(
put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
}

if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
put_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id);
}

// save versions for partition
int64_t version_update_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
Expand Down
13 changes: 11 additions & 2 deletions cloud/src/meta-store/keys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ static const char* STATS_KEY_INFIX_TABLET = "tablet";

static const char* JOB_KEY_INFIX_TABLET = "tablet";
static const char* JOB_KEY_INFIX_RL_PROGRESS = "routine_load_progress";
static const char* JOB_KEY_INFIX_STREAMING_JOB_META = "streaming_job_meta";
static const char* JOB_KEY_INFIX_RESTORE_TABLET = "restore_tablet";
static const char* JOB_KEY_INFIX_RESTORE_ROWSET = "restore_rowset";

Expand Down Expand Up @@ -144,7 +145,7 @@ static void encode_prefix(const T& t, std::string* key) {
MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo, RecycleTxnKeyInfo, RecycleStageKeyInfo,
StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo, JobRestoreRowsetKeyInfo,
JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, StreamingJobMetaKeyInfo,
CopyJobKeyInfo, CopyFileKeyInfo, StorageVaultKeyInfo, MetaSchemaPBDictionaryInfo,
MowTabletJobInfo>);

Expand Down Expand Up @@ -181,7 +182,8 @@ static void encode_prefix(const T& t, std::string* key) {
encode_bytes(STATS_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, JobTabletKeyInfo>
|| std::is_same_v<T, JobRecycleKeyInfo>
|| std::is_same_v<T, RLJobProgressKeyInfo>) {
|| std::is_same_v<T, RLJobProgressKeyInfo>
|| std::is_same_v<T, StreamingJobMetaKeyInfo>) {
encode_bytes(JOB_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, CopyJobKeyInfo>
|| std::is_same_v<T, CopyFileKeyInfo>) {
Expand Down Expand Up @@ -463,6 +465,13 @@ void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* out)
encode_int64(std::get<2>(in), out); // job_id
}

void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "job" ${instance_id}
encode_bytes(JOB_KEY_INFIX_STREAMING_JOB_META, out); // "streaming_job_meta"
encode_int64(std::get<1>(in), out); // db_id
encode_int64(std::get<2>(in), out); // job_id
}

void job_restore_tablet_key(const JobRestoreTabletKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "job" ${instance_id}
encode_bytes(JOB_KEY_INFIX_RESTORE_TABLET, out); // "restore_tablet"
Expand Down
5 changes: 5 additions & 0 deletions cloud/src/meta-store/keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ using MetaPendingDeleteBitmapInfo = BasicKeyInfo<24 , std::tuple<std::string, in
// 0:instance_id 1:db_id 2:job_id
using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple<std::string, int64_t, int64_t>>;

// 0:instance_id 1:db_id 2:job_id
using StreamingJobMetaKeyInfo = BasicKeyInfo<52, std::tuple<std::string, int64_t, int64_t>>;

// 0:instance_id 1:vault_id
using StorageVaultKeyInfo = BasicKeyInfo<26, std::tuple<std::string, std::string>>;

Expand Down Expand Up @@ -407,6 +410,8 @@ void job_tablet_key(const JobTabletKeyInfo& in, std::string* out);
static inline std::string job_tablet_key(const JobTabletKeyInfo& in) { std::string s; job_tablet_key(in, &s); return s; }
void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* out);
static inline std::string rl_job_progress_key_info(const RLJobProgressKeyInfo& in) { std::string s; rl_job_progress_key_info(in, &s); return s; }
void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in, std::string* out);
static inline std::string streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in) { std::string s; streaming_job_meta_key_info(in, &s); return s; }

std::string copy_key_prefix(std::string_view instance_id);
void copy_job_key(const CopyJobKeyInfo& in, std::string* out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,10 @@ public Cloud. ResetRLProgressResponse resetRLProgress(Cloud. ResetRLProgressRequ
return blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, TimeUnit.MILLISECONDS)
.createInstance(request);
}

public Cloud.GetStreamingTaskCommitAttachResponse
getStreamingTaskCommitAttach(Cloud.GetStreamingTaskCommitAttachRequest request) {
return blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, TimeUnit.MILLISECONDS)
.getStreamingTaskCommitAttach(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,4 +432,9 @@ public Cloud.ResetRLProgressResponse resetRLProgress(Cloud.ResetRLProgressReques
public Cloud.CreateInstanceResponse createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
return w.executeRequest((client) -> client.createInstance(request));
}

public Cloud.GetStreamingTaskCommitAttachResponse getStreamingTaskCommitAttach(
Cloud.GetStreamingTaskCommitAttachRequest request) throws RpcException {
return w.executeRequest((client) -> client.getStreamingTaskCommitAttach(request));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.event.DataChangeEvent;
import org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.metric.MetricRepo;
Expand Down Expand Up @@ -619,6 +620,19 @@ private void commitTransactionWithoutLock(long dbId, List<Table> tableList, long
}
builder.setCommitAttachment(TxnUtil
.rlTaskTxnCommitAttachmentToPb(rlTaskTxnCommitAttachment));
} else if (txnCommitAttachment instanceof StreamingTaskTxnCommitAttachment) {
StreamingTaskTxnCommitAttachment streamingTaskTxnCommitAttachment =
(StreamingTaskTxnCommitAttachment) txnCommitAttachment;
TxnStateChangeCallback cb = callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getTaskId());
if (cb != null) {
// use a temporary transaction state to do before commit check,
// what actually works is the transactionId
TransactionState tmpTxnState = new TransactionState();
tmpTxnState.setTransactionId(transactionId);
cb.beforeCommitted(tmpTxnState);
}
builder.setCommitAttachment(TxnUtil
.streamingTaskTxnCommitAttachmentToPb(streamingTaskTxnCommitAttachment));
} else {
throw new UserException("invalid txnCommitAttachment");
}
Expand Down
Loading
Loading