Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eb6aa52
[feat](streaming job) introduce streaming job schedule task (#55790)
sollhui Sep 9, 2025
a69c8fd
[Feature](WIP) Add Create StreamingJob and Alter Job (#55814)
JNSimba Sep 9, 2025
47f2e1e
[feat](streaming job) introduce streaming task scheduler (#55857)
sollhui Sep 10, 2025
cc4c616
[Feature](WIP) add StreamingInsertTask and improve StreamInsertJob (#…
JNSimba Sep 11, 2025
e2a492c
[feat](streaming job) implement offset persistence and replay (#55918)
sollhui Sep 11, 2025
4c6f13c
[Feature](WIP) add S3 Stream job split offset (#55927)
JNSimba Sep 12, 2025
991de2d
[Feature](WIP) Fix streaming job problem (#55949)
JNSimba Sep 12, 2025
ad763ce
[Feature](WIP) Add fetch meta and fix rewrite tvf problem (#55986)
JNSimba Sep 12, 2025
ec65dcb
[feat](streaming job) implement offset persistence and replay in clou…
sollhui Sep 14, 2025
434b2b3
[fix](streaming job) register listener id when begin transaction (#56…
sollhui Sep 15, 2025
3390210
[Feature](WIP) Add create job case and fix job bug (#56119)
JNSimba Sep 17, 2025
406276e
[fix](streaming job) add task commit check and job event lock to ensu…
sollhui Sep 17, 2025
a541e08
[fix](streaming job) fix compile error (#56141)
sollhui Sep 17, 2025
cbf74bf
[fix](streaming job) fix register callback id invalid (#56142)
sollhui Sep 17, 2025
0fbe683
[revert](streaming job) revert "implement offset persistence and repl…
sollhui Sep 17, 2025
23c6fd7
Revert "[revert](streaming job) revert "implement offset persistence …
JNSimba Sep 17, 2025
b2adc93
[Feature](WIP) Fix Alter Job and schedule bug etc (#56166)
JNSimba Sep 17, 2025
5ce7a4b
Merge branch 'master' into streaming-job-dev
JNSimba Sep 18, 2025
777ba16
[Featuren](WIP) add alter job op and fix bug (#56194)
JNSimba Sep 18, 2025
1fb95b2
Merge branch 'master' into streaming-job-dev
JNSimba Sep 19, 2025
4db2ec2
[fix](streaming job) fix some streaming job bug (#56221)
sollhui Sep 19, 2025
71a8bac
[Fix](job) fix job bug (#56225)
JNSimba Sep 19, 2025
4f5f56a
[fix](streaming job) refactor some logic and add ut (#56292)
sollhui Sep 22, 2025
e0ee446
[Feature](wip) improve streamjob and delete alterstreamjob op (#56308)
JNSimba Sep 22, 2025
6d13742
[fix](streaming job) fix streaming job ut (#56314)
sollhui Sep 22, 2025
b8719f1
Merge branch 'master' into streaming-job-dev
JNSimba Sep 23, 2025
d471124
[Fix](job) improve job api (#56352)
JNSimba Sep 23, 2025
698fd51
fix format
sollhui Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap("ms", "remove_delete_b
BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock("ms", "remove_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms", "get_streaming_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", "reset_rl_progress");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job");
Expand Down Expand Up @@ -370,6 +371,8 @@ mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_counter("rpc_kv_precommit_txn_ge
mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter("rpc_kv_precommit_txn_put_counter",{"instance_id"});
// get_rl_task_commit_attach
mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_rl_task_commit_attach_get_counter",{"instance_id"});
// get_streaming_task_commit_attach
mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"});
Expand Down Expand Up @@ -558,6 +561,8 @@ mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_bytes("rpc_kv_precommit_txn_get_
mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes("rpc_kv_precommit_txn_put_bytes",{"instance_id"});
// get_rl_task_commit_attach
mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl_task_commit_attach_get_bytes",{"instance_id"});
// get_streaming_task_commit_attach
mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"});
Expand Down
3 changes: 3 additions & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
Expand Down Expand Up @@ -476,6 +477,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter;
Expand Down Expand Up @@ -606,6 +608,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes;
Expand Down
13 changes: 13 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ class MetaServiceImpl : public cloud::MetaService {
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;

void get_streaming_task_commit_attach(::google::protobuf::RpcController* controller,
const GetStreamingTaskCommitAttachRequest* request,
GetStreamingTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;

void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request, ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override;
Expand Down Expand Up @@ -831,6 +836,14 @@ class MetaServiceProxy final : public MetaService {
done);
}

void get_streaming_task_commit_attach(::google::protobuf::RpcController* controller,
const GetStreamingTaskCommitAttachRequest* request,
GetStreamingTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::get_streaming_task_commit_attach, controller, request,
response, done);
}

void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request, ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override {
Expand Down
142 changes: 142 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,72 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg,
<< " routine load new progress: " << new_progress_info.ShortDebugString();
}

void update_streaming_job_meta(MetaServiceCode& code, std::string& msg,
const std::string& instance_id, const CommitTxnRequest* request,
Transaction* txn, int64_t db_id) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
if (!request->has_commit_attachment()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "missing commit attachment, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
StreamingTaskCommitAttachmentPB commit_attachment =
txn_commit_attachment.streaming_task_txn_commit_attachment();
int64_t job_id = commit_attachment.job_id();

std::string streaming_job_val;
bool prev_existed = true;
std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, job_id});
TxnErrorCode err = txn->get(streaming_job_key_str, &streaming_job_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
prev_existed = false;
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get streaming job, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
return;
}

StreamingTaskCommitAttachmentPB new_job_info;
if (prev_existed) {
if (!new_job_info.ParseFromString(streaming_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse streaming job meta, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
new_job_info.set_scanned_rows(new_job_info.scanned_rows() +
commit_attachment.scanned_rows());
new_job_info.set_load_bytes(new_job_info.load_bytes() + commit_attachment.load_bytes());
new_job_info.set_num_files(new_job_info.num_files() + commit_attachment.num_files());
new_job_info.set_file_bytes(new_job_info.file_bytes() + commit_attachment.file_bytes());
} else {
new_job_info.set_job_id(commit_attachment.job_id());
new_job_info.set_scanned_rows(commit_attachment.scanned_rows());
new_job_info.set_load_bytes(commit_attachment.load_bytes());
new_job_info.set_num_files(commit_attachment.num_files());
new_job_info.set_file_bytes(commit_attachment.file_bytes());
}
if (commit_attachment.has_offset()) {
new_job_info.set_offset(commit_attachment.offset());
}
std::string new_job_val;
if (!new_job_info.SerializeToString(&new_job_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new streaming job val, txn_id=" << txn_id;
msg = ss.str();
return;
}

txn->put(streaming_job_key_str, new_job_val);
LOG(INFO) << "put streaming_job_key key=" << hex(streaming_job_key_str)
<< " streaming job new meta: " << new_job_info.ShortDebugString();
}

void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller,
const GetRLTaskCommitAttachRequest* request,
GetRLTaskCommitAttachResponse* response,
Expand Down Expand Up @@ -678,6 +744,62 @@ void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
}
}

void MetaServiceImpl::get_streaming_task_commit_attach(
::google::protobuf::RpcController* controller,
const GetStreamingTaskCommitAttachRequest* request,
GetStreamingTaskCommitAttachResponse* response, ::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_streaming_task_commit_attach, get);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(get_streaming_task_commit_attach)

TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, err=" << err;
msg = ss.str();
return;
}

if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}

int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string streaming_job_val;
std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, job_id});
err = txn->get(streaming_job_key_str, &streaming_job_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND;
ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err;
msg = ss.str();
return;
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get progress info, db_id=" << db_id << " job_id=" << job_id
<< " err=" << err;
msg = ss.str();
return;
}

StreamingTaskCommitAttachmentPB* commit_attach = response->mutable_commit_attach();
if (!commit_attach->ParseFromString(streaming_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse meta info, db_id=" << db_id << " job_id=" << job_id;
msg = ss.str();
return;
}
}

void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
Expand Down Expand Up @@ -1577,6 +1699,16 @@ void MetaServiceImpl::commit_txn_immediately(
put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
}

if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
update_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" << txn_id
<< " code=" << code << " msg=" << msg;
return;
}
}

LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
<< " txn_id=" << txn_id;
LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
Expand Down Expand Up @@ -1970,6 +2102,16 @@ void MetaServiceImpl::commit_txn_eventually(
put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
}

if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
update_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" << txn_id
<< " code=" << code << " msg=" << msg;
return;
}
}

// save versions for partition
int64_t version_update_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
Expand Down
13 changes: 11 additions & 2 deletions cloud/src/meta-store/keys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ static const char* STATS_KEY_INFIX_TABLET = "tablet";

static const char* JOB_KEY_INFIX_TABLET = "tablet";
static const char* JOB_KEY_INFIX_RL_PROGRESS = "routine_load_progress";
static const char* JOB_KEY_INFIX_STREAMING_JOB = "streaming_job";
static const char* JOB_KEY_INFIX_RESTORE_TABLET = "restore_tablet";
static const char* JOB_KEY_INFIX_RESTORE_ROWSET = "restore_rowset";

Expand Down Expand Up @@ -145,7 +146,7 @@ static void encode_prefix(const T& t, std::string* key) {
MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo, RecycleTxnKeyInfo, RecycleStageKeyInfo,
StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo, JobRestoreRowsetKeyInfo,
JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, StreamingJobKeyInfo,
CopyJobKeyInfo, CopyFileKeyInfo, StorageVaultKeyInfo, MetaSchemaPBDictionaryInfo,
MowTabletJobInfo>);

Expand Down Expand Up @@ -182,7 +183,8 @@ static void encode_prefix(const T& t, std::string* key) {
encode_bytes(STATS_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, JobTabletKeyInfo>
|| std::is_same_v<T, JobRecycleKeyInfo>
|| std::is_same_v<T, RLJobProgressKeyInfo>) {
|| std::is_same_v<T, RLJobProgressKeyInfo>
|| std::is_same_v<T, StreamingJobKeyInfo>) {
encode_bytes(JOB_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, CopyJobKeyInfo>
|| std::is_same_v<T, CopyFileKeyInfo>) {
Expand Down Expand Up @@ -464,6 +466,13 @@ void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* out)
encode_int64(std::get<2>(in), out); // job_id
}

void streaming_job_key(const StreamingJobKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "job" ${instance_id}
encode_bytes(JOB_KEY_INFIX_STREAMING_JOB, out); // "streaming_job"
encode_int64(std::get<1>(in), out); // db_id
encode_int64(std::get<2>(in), out); // job_id
}

void job_restore_tablet_key(const JobRestoreTabletKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "job" ${instance_id}
encode_bytes(JOB_KEY_INFIX_RESTORE_TABLET, out); // "restore_tablet"
Expand Down
7 changes: 7 additions & 0 deletions cloud/src/meta-store/keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
// 0x01 "job" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} -> TabletJobInfoPB
// 0x01 "job" ${instance_id} "recycle" -> JobRecyclePB
// 0x01 "job" ${instance_id} "check" -> JobRecyclePB
// 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id} -> StreamingJobPB

//
// 0x01 "copy" ${instance_id} "job" ${stage_id} ${table_id} ${copy_id} ${group_id} -> CopyJobPB
// 0x01 "copy" ${instance_id} "loading_file" ${stage_id} ${table_id} ${obj_name} ${etag} -> CopyFilePB
Expand Down Expand Up @@ -218,6 +220,9 @@ using MetaPendingDeleteBitmapInfo = BasicKeyInfo<24 , std::tuple<std::string, in
// 0:instance_id 1:db_id 2:job_id
using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple<std::string, int64_t, int64_t>>;

// 0:instance_id 1:db_id 2:job_id
using StreamingJobKeyInfo = BasicKeyInfo<52, std::tuple<std::string, int64_t, int64_t>>;

// 0:instance_id 1:vault_id
using StorageVaultKeyInfo = BasicKeyInfo<26, std::tuple<std::string, std::string>>;

Expand Down Expand Up @@ -407,6 +412,8 @@ void job_tablet_key(const JobTabletKeyInfo& in, std::string* out);
static inline std::string job_tablet_key(const JobTabletKeyInfo& in) { std::string s; job_tablet_key(in, &s); return s; }
void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* out);
static inline std::string rl_job_progress_key_info(const RLJobProgressKeyInfo& in) { std::string s; rl_job_progress_key_info(in, &s); return s; }
void streaming_job_key(const StreamingJobKeyInfo& in, std::string* out);
static inline std::string streaming_job_key(const StreamingJobKeyInfo& in) { std::string s; streaming_job_key(in, &s); return s; }

std::string copy_key_prefix(std::string_view instance_id);
void copy_job_key(const CopyJobKeyInfo& in, std::string* out);
Expand Down
Loading
Loading