From ea988a68273deec3f023603ff0aab71f86f4a3e9 Mon Sep 17 00:00:00 2001 From: koarz Date: Mon, 14 Jul 2025 18:09:39 +0800 Subject: [PATCH] [enhance](meta-service) add real request ip for be rpc #53114 --- be/src/cloud/cloud_meta_mgr.cpp | 2 + cloud/src/meta-service/meta_service_helper.h | 51 ++++++++++------- gensrc/proto/cloud.proto | 58 +++++++++++++++++++- 3 files changed, 89 insertions(+), 22 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index e9b145813f19f5..9548a757a24784 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -388,6 +388,8 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res, static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); + const_cast(req).set_request_ip(BackendOptions::get_be_endpoint()); + int retry_times = 0; uint32_t duration_ms = 0; std::string error_msg; diff --git a/cloud/src/meta-service/meta_service_helper.h b/cloud/src/meta-service/meta_service_helper.h index 7ecdf856659db0..4e90ec7f17d8cd 100644 --- a/cloud/src/meta-service/meta_service_helper.h +++ b/cloud/src/meta-service/meta_service_helper.h @@ -97,35 +97,44 @@ inline std::string encryt_sk(std::string debug_string) { template void begin_rpc(std::string_view func_name, brpc::Controller* ctrl, const Request* req) { if constexpr (std::is_same_v) { - LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side(); + LOG(INFO) << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip(); } else if constexpr (std::is_same_v) { - LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side(); + LOG(INFO) << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip(); } else if constexpr (std::is_same_v) { - LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side() - << " table_id=" << req->table_id() << " tablet_id=" << req->tablet_id() - << " lock_id=" << req->lock_id() << " initiator=" << req->initiator() + LOG(INFO) << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip() << " table_id=" << req->table_id() + << " tablet_id=" << req->tablet_id() << " lock_id=" << req->lock_id() + << " initiator=" << req->initiator() << " delete_bitmap_size=" << req->segment_delete_bitmaps_size(); } else if constexpr (std::is_same_v) { - LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side() + LOG(INFO) << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip() << " tablet_id=" << req->tablet_id() << " rowset_size=" << req->rowset_ids_size(); } else if constexpr (std::is_same_v) { - VLOG_DEBUG << "begin " << func_name << " from " << ctrl->remote_side() + VLOG_DEBUG << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip() << " tablet size: " << req->tablet_idx().size(); } else if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) { - VLOG_DEBUG << "begin " << func_name << " from " << ctrl->remote_side() + VLOG_DEBUG << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip() << " request=" << req->ShortDebugString(); } else if constexpr (std::is_same_v) { - LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side() + LOG(INFO) << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip() << " tablet_id=" << req->tablet_id() << " rowset_size=" << req->rowset_ids_size(); } else if constexpr (std::is_same_v) { - LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side() - << " table_id=" << req->table_id() << " lock_id=" << req->lock_id() - << " initiator=" << req->initiator() << " expiration=" << req->expiration() + LOG(INFO) << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip() << " table_id=" << req->table_id() + << " lock_id=" << req->lock_id() << " initiator=" << req->initiator() + << " expiration=" << req->expiration() << " require_compaction_stats=" << req->require_compaction_stats(); } else { - LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side() + LOG(INFO) << "begin " << func_name << " remote caller: " << ctrl->remote_side() + << " original client ip: " << req->request_ip() << " request=" << req->ShortDebugString(); } } @@ -138,21 +147,21 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re res->clear_partition_ids(); res->clear_versions(); } - LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side() + LOG(INFO) << "finish " << func_name << " remote caller: " << ctrl->remote_side() << " response=" << res->ShortDebugString(); } else if constexpr (std::is_same_v) { if (res->status().code() != MetaServiceCode::OK) { res->clear_rowset_meta(); } - VLOG_DEBUG << "finish " << func_name << " from " << ctrl->remote_side() + VLOG_DEBUG << "finish " << func_name << " remote caller: " << ctrl->remote_side() << " status=" << res->status().ShortDebugString(); } else if constexpr (std::is_same_v) { - VLOG_DEBUG << "finish " << func_name << " from " << ctrl->remote_side() + VLOG_DEBUG << "finish " << func_name << " remote caller: " << ctrl->remote_side() << " status=" << res->status().ShortDebugString() << " tablet size: " << res->tablet_stats().size(); } else if constexpr (std::is_same_v || std::is_same_v) { - VLOG_DEBUG << "finish " << func_name << " from " << ctrl->remote_side() + VLOG_DEBUG << "finish " << func_name << " remote caller: " << ctrl->remote_side() << " response=" << res->ShortDebugString(); } else if constexpr (std::is_same_v) { if (res->status().code() != MetaServiceCode::OK) { @@ -161,7 +170,7 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re res->clear_versions(); res->clear_segment_delete_bitmaps(); } - LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side() + LOG(INFO) << "finish " << func_name << " remote caller: " << ctrl->remote_side() << " status=" << res->status().ShortDebugString() << " tablet=" << res->tablet_id() << " delete_bitmap_count=" << res->segment_delete_bitmaps_size(); @@ -171,16 +180,16 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re res->clear_cumulative_compaction_cnts(); res->clear_cumulative_points(); } - LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side() + LOG(INFO) << "finish " << func_name << " remote caller: " << ctrl->remote_side() << " status=" << res->status().ShortDebugString(); } else if constexpr (std::is_same_v || std::is_same_v) { std::string debug_string = encryt_sk(res->DebugString()); TEST_SYNC_POINT_CALLBACK("sk_finish_rpc", &debug_string); - LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side() + LOG(INFO) << "finish " << func_name << " remote caller: " << ctrl->remote_side() << " response=" << debug_string; } else { - LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side() + LOG(INFO) << "finish " << func_name << " remote caller: " << ctrl->remote_side() << " response=" << res->ShortDebugString(); } } diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 2700f097687dce..2c2350d2290725 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -679,6 +679,7 @@ message ServiceRegistryPB { message BeginTxnRequest { optional string cloud_unique_id = 1; // For auth optional TxnInfoPB txn_info = 2; + optional string request_ip = 3; } message BeginTxnResponse { @@ -695,6 +696,7 @@ message PrecommitTxnRequest { optional int64 txn_id = 3; optional TxnCommitAttachmentPB commit_attachment = 4; optional int64 precommit_timeout_ms = 5; + optional string request_ip = 6; } message PrecommitTxnResponse { @@ -716,6 +718,7 @@ message CommitTxnRequest { optional bool is_txn_load = 9; repeated SubTxnInfo sub_txn_infos = 10; optional bool enable_txn_lazy_commit = 11; + optional string request_ip = 12; } message SubTxnInfo { @@ -749,6 +752,7 @@ message AbortTxnRequest { optional string label = 4; optional string reason = 5; optional TxnCommitAttachmentPB commit_attachment = 6; + optional string request_ip = 7; } message AbortTxnResponse { @@ -761,6 +765,7 @@ message GetTxnRequest { optional int64 db_id = 2; optional int64 txn_id = 3; optional string label = 4; + optional string request_ip = 5; } message GetTxnResponse { @@ -773,6 +778,7 @@ message GetTxnIdRequest { optional int64 db_id = 2; optional string label = 3; repeated TxnStatusPB txn_status = 4; + optional string request_ip = 5; } message GetTxnIdResponse { @@ -790,6 +796,7 @@ message BeginSubTxnRequest { repeated int64 table_ids = 5; // a random label used to generate a sub_txn_id optional string label = 6; + optional string request_ip = 7; } message BeginSubTxnResponse { @@ -808,6 +815,7 @@ message AbortSubTxnRequest { optional int64 db_id = 5; // set table_ids in txn_info repeated int64 table_ids = 6; + optional string request_ip = 7; } message AbortSubTxnResponse { @@ -817,6 +825,7 @@ message AbortSubTxnResponse { message GetCurrentMaxTxnRequest { optional string cloud_unique_id = 1; // For auth + optional string request_ip = 2; } message GetCurrentMaxTxnResponse { @@ -829,6 +838,7 @@ message AbortTxnWithCoordinatorRequest { optional string ip = 2; optional int64 id = 3; optional int64 start_time = 4; + optional string request_ip = 5; } message AbortTxnWithCoordinatorResponse { @@ -841,6 +851,7 @@ message CheckTxnConflictRequest { optional int64 end_txn_id = 3; repeated int64 table_ids = 4; optional bool ignore_timeout_txn = 5; + optional string request_ip = 6; } message CheckTxnConflictResponse { @@ -853,6 +864,7 @@ message CleanTxnLabelRequest { optional string cloud_unique_id = 1; // For auth optional int64 db_id = 2; repeated string labels = 3; + optional string request_ip = 4; } message CleanTxnLabelResponse { @@ -873,6 +885,8 @@ message GetVersionRequest { // True if get table version optional bool is_table_version = 9; + + optional string request_ip = 10; }; message GetVersionResponse { @@ -890,6 +904,7 @@ message GetVersionResponse { message GetObjStoreInfoRequest { optional string cloud_unique_id = 1; // For auth + optional string request_ip = 2; }; message AlterObjStoreInfoRequest { @@ -916,6 +931,7 @@ message AlterObjStoreInfoRequest { optional Operation op = 3; optional StorageVaultPB vault = 4; optional bool set_as_default_storage_vault = 5; + optional string request_ip = 6; } message AlterObjStoreInfoResponse { @@ -928,6 +944,7 @@ message UpdateAkSkRequest { optional string instance_id = 1; repeated RamUserPB internal_bucket_user = 2; optional RamUserPB ram_user = 3; + optional string request_ip = 4; } message UpdateAkSkResponse { @@ -948,6 +965,7 @@ message CreateTabletsRequest { repeated doris.TabletMetaCloudPB tablet_metas = 2; optional string storage_vault_name = 3; optional int64 db_id = 4; + optional string request_ip = 5; } message CreateTabletsResponse { @@ -959,6 +977,7 @@ message CreateTabletsResponse { message UpdateTabletRequest { optional string cloud_unique_id = 1; // For auth repeated TabletMetaInfoPB tablet_meta_infos = 2; + optional string request_ip = 3; } message UpdateTabletResponse { @@ -969,6 +988,7 @@ message UpdateTabletSchemaRequest { optional string cloud_unique_id = 1; // For auth optional int64 tablet_id = 2; optional doris.TabletSchemaCloudPB tablet_schema = 3; + optional string request_ip = 4; } message UpdateTabletSchemaResponse { @@ -978,12 +998,14 @@ message UpdateTabletSchemaResponse { message DropTabletRequest { optional string cloud_unique_id = 1; // For auth optional int64 tablet_id = 2; + optional string request_ip = 3; // TODO: There are more fields TBD } message GetTabletRequest { optional string cloud_unique_id = 1; // For auth optional int64 tablet_id = 2; + optional string request_ip = 3; // TODO: There are more fields TBD } @@ -998,6 +1020,7 @@ message CreateRowsetRequest { optional bool temporary = 3; optional int64 txn_id = 4; optional string tablet_job_id = 5; + optional string request_ip = 6; } message CreateRowsetResponse { @@ -1021,6 +1044,7 @@ message GetRowsetRequest { // returned schema format on rowset schema, used in variant type directly. // for compability reason we use FILL_WITH_DICT as default optional SchemaOp schema_op = 8 [default = FILL_WITH_DICT]; + optional string request_ip = 9; } message GetRowsetResponse { @@ -1034,6 +1058,7 @@ message GetRowsetResponse { message GetSchemaDictRequest { optional string cloud_unique_id = 1; // For auth optional int64 index_id = 2; + optional string request_ip = 3; } message GetSchemaDictResponse { @@ -1048,6 +1073,7 @@ message IndexRequest { optional int64 expiration = 4; optional int64 db_id = 5; optional bool is_new_table = 6; // For table version + optional string request_ip = 7; } message IndexResponse { @@ -1062,6 +1088,7 @@ message PartitionRequest { optional int64 expiration = 5; optional int64 db_id = 6; optional bool need_update_table_version = 7; + optional string request_ip = 8; } message PartitionResponse { @@ -1087,6 +1114,7 @@ message CreateInstanceRequest { optional RamUserPB ram_user = 5; optional bool sse_enabled = 6; optional StorageVaultPB vault = 7; + optional string request_ip = 8; } message CreateInstanceResponse { @@ -1107,6 +1135,7 @@ message AlterInstanceRequest { optional string instance_id = 1; optional Operation op = 2; optional string name = 3; + optional string request_ip = 4; } message AlterInstanceResponse { @@ -1116,6 +1145,7 @@ message AlterInstanceResponse { message GetInstanceRequest { optional string instance_id = 1; optional string cloud_unique_id = 2; + optional string request_ip = 3; } message GetInstanceResponse { @@ -1144,6 +1174,7 @@ message AlterClusterRequest { optional Operation op = 4; // for SQL mode rename cluster, rename to cluster name eq instance empty cluster name, need drop empty cluster optional bool replace_if_existing_empty_target_cluster = 5; + optional string request_ip = 6; } message AlterClusterResponse { @@ -1156,12 +1187,14 @@ message GetClusterRequest { optional string cluster_id = 3; optional string cluster_name = 4; optional string mysql_user_name = 5; + optional string request_ip = 6; } message GetClusterStatusRequest { repeated string instance_ids = 1; // Redundant field repeated string cloud_unique_ids = 2; optional ClusterStatus status = 3; + optional string request_ip = 4; } message GetClusterStatusResponse { @@ -1182,6 +1215,7 @@ message GetClusterResponse { message GetTabletStatsRequest { optional string cloud_unique_id = 1; repeated TabletIndexPB tablet_idx = 2; + optional string request_ip = 3; } message GetTabletStatsResponse { @@ -1192,6 +1226,7 @@ message GetTabletStatsResponse { message CreateStageRequest { optional string cloud_unique_id = 1; optional StagePB stage = 2; + optional string request_ip = 3; } message CreateStageResponse { @@ -1204,6 +1239,7 @@ message GetStageRequest { optional string mysql_user_name = 3; optional StagePB.StageType type = 4; optional string mysql_user_id = 5; + optional string request_ip = 6; } message GetStageResponse { @@ -1218,6 +1254,7 @@ message DropStageRequest { optional StagePB.StageType type = 4; optional string mysql_user_id = 5; optional string reason = 6; + optional string request_ip = 7; } message DropStageResponse { @@ -1226,6 +1263,7 @@ message DropStageResponse { message GetIamRequest { optional string cloud_unique_id = 1; + optional string request_ip = 2; } message GetIamResponse { @@ -1238,6 +1276,7 @@ message AlterIamRequest { optional string account_id = 1; optional string ak = 2; optional string sk = 3; + optional string request_ip = 4; } message AlterIamResponse { @@ -1247,6 +1286,7 @@ message AlterIamResponse { message AlterRamUserRequest { optional string instance_id = 1; optional RamUserPB ram_user = 2; + optional string request_ip = 3; } message AlterRamUserResponse { @@ -1256,6 +1296,7 @@ message AlterRamUserResponse { message StartTabletJobRequest { optional string cloud_unique_id = 1; // For auth optional TabletJobInfoPB job = 2; + optional string request_ip = 3; } message StartTabletJobResponse { @@ -1274,6 +1315,7 @@ message FinishTabletJobRequest { optional string cloud_unique_id = 1; // For auth optional Action action = 2; optional TabletJobInfoPB job = 3; + optional string request_ip = 4; } message FinishTabletJobResponse { @@ -1295,6 +1337,7 @@ message BeginCopyRequest { optional int64 file_num_limit = 10; optional int64 file_size_limit = 11; optional int64 file_meta_size_limit = 12; + optional string request_ip = 13; } message BeginCopyResponse { @@ -1317,6 +1360,7 @@ message FinishCopyRequest { optional int32 group_id = 6; optional Action action = 7; optional int64 finish_time_ms = 8; + optional string request_ip = 9; } message FinishCopyResponse { @@ -1329,6 +1373,7 @@ message GetCopyJobRequest { optional int64 table_id = 3; optional string copy_id = 4; optional int32 group_id = 5; + optional string request_ip = 6; } message GetCopyJobResponse { @@ -1340,6 +1385,7 @@ message GetCopyFilesRequest { optional string cloud_unique_id = 1; optional string stage_id = 2; optional int64 table_id = 3; + optional string request_ip = 4; } message GetCopyFilesResponse { @@ -1352,6 +1398,7 @@ message FilterCopyFilesRequest { optional string stage_id = 2; optional int64 table_id = 3; repeated ObjectFilePB object_files = 4; + optional string request_ip = 5; } message FilterCopyFilesResponse { @@ -1361,6 +1408,7 @@ message FilterCopyFilesResponse { message RecycleInstanceRequest { repeated string instance_ids = 1; + optional string request_ip = 2; } message StatisticsRecycleRequest { @@ -1472,6 +1520,7 @@ message UpdateDeleteBitmapRequest { optional int64 pre_rowset_agg_end_version = 16; // when update delete_bitmap of pre rowsets, check the rowset exists repeated int64 pre_rowset_versions = 17; + optional string request_ip = 18; } message UpdateDeleteBitmapResponse { @@ -1488,6 +1537,7 @@ message GetDeleteBitmapRequest { optional int64 base_compaction_cnt = 7; optional int64 cumulative_compaction_cnt = 8; optional int64 cumulative_point = 9; + optional string request_ip = 10; } message GetDeleteBitmapResponse { @@ -1506,6 +1556,7 @@ message RemoveDeleteBitmapRequest { repeated string rowset_ids = 3; repeated int64 begin_versions = 4; repeated int64 end_versions = 5; + optional string request_ip = 6; } message RemoveDeleteBitmapResponse { @@ -1535,6 +1586,7 @@ message GetDeleteBitmapUpdateLockRequest { optional int64 expiration = 6; optional bool require_compaction_stats = 7 [default = false]; repeated TabletIndexPB tablet_indexes = 8; + optional string request_ip = 9; } message GetDeleteBitmapUpdateLockResponse { @@ -1551,6 +1603,7 @@ message RemoveDeleteBitmapUpdateLockRequest { optional int64 tablet_id = 3; optional int64 lock_id = 4; optional int64 initiator = 5; + optional string request_ip = 6; } message RemoveDeleteBitmapUpdateLockResponse { @@ -1561,6 +1614,7 @@ message GetRLTaskCommitAttachRequest { optional string cloud_unique_id = 1; // For auth optional int64 db_id = 2; optional int64 job_id = 3; + optional string request_ip = 4; } message GetRLTaskCommitAttachResponse { @@ -1573,6 +1627,7 @@ message ResetRLProgressRequest { optional int64 db_id = 2; optional int64 job_id = 3; map partition_to_offset = 4; + optional string request_ip = 5; } message ResetRLProgressResponse { @@ -1593,7 +1648,8 @@ message CheckKVRequest { } optional string cloud_unique_id = 1; // For auth optional CheckKeyInfos check_keys = 2; - optional Operation op = 3; + optional Operation op = 3; + optional string request_ip = 4; } message CheckKVResponse {