diff --git a/cli/src/monitoring.rs b/cli/src/monitoring.rs index 03480e0..506cc5f 100644 --- a/cli/src/monitoring.rs +++ b/cli/src/monitoring.rs @@ -3,16 +3,17 @@ //monitoring CLI function for identity service use anyhow::Error; use colored::Colorize; +use k8s_openapi::chrono::DateTime; use prost::Message; use prost_types::FileDescriptorProto; use std::result::Result::Ok; -use tonic_reflection::pb::v1::{ server_reflection_response::MessageResponse }; +use tonic_reflection::pb::v1::server_reflection_response::MessageResponse; -use agent_api::client::{ connect_to_client, connect_to_server_reflection }; -use agent_api::requests::{ get_all_features, send_active_connection_request }; +use agent_api::client::{connect_to_client, connect_to_server_reflection}; +use agent_api::requests::{get_all_features, send_active_connection_request}; use clap::command; -use clap::{ Args, Parser, Subcommand }; +use clap::{Args, Parser, Subcommand}; //monitoring subcommands #[derive(Subcommand, Debug, Clone)] @@ -87,14 +88,14 @@ pub async fn list_features() -> Result<(), Error> { } } } - Err(e) =>{ + Err(e) => { println!( "{} {}", "=====>".blue().bold(), "Failed to connect to CortexFlow Server Reflection".red() ); return Err(e); - } + } } Ok(()) } @@ -111,7 +112,11 @@ pub async fn monitor_identity_events() -> Result<(), Error> { if resp.events.is_empty() { println!("{} No events found", "=====>".blue().bold()); } else { - println!("{} Found {} events", "=====>".blue().bold(), resp.events.len()); + println!( + "{} Found {} events", + "=====>".blue().bold(), + resp.events.len() + ); for (i, ev) in resp.events.iter().enumerate() { println!( "{} Event[{}] id: {} src: {} dst: {}", @@ -136,7 +141,7 @@ pub async fn monitor_identity_events() -> Result<(), Error> { } } } - Err(e) =>{ + Err(e) => { println!( "{} {}", "=====>".blue().bold(), @@ -163,10 +168,16 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> { if resp.metrics.is_empty() { println!("{} No latency metrics found", "=====>".blue().bold()); } else { - println!("{} Found {} latency metrics", "=====>".blue().bold(), resp.metrics.len()); + println!( + "{} Found {} latency metrics", + "=====>".blue().bold(), + resp.metrics.len() + ); + for (i, metric) in resp.metrics.iter().enumerate() { + let converted_timestamp= convert_timestamp_to_date(metric.timestamp_us); println!( - "index {} Latency[{}], tgid {} process_name {} address_family {} delta_us {} src_address_v4 {} dst_address_v4 {} src_address_v6 {} dst_address_v6 {} local_port {} remote_port {} timestamp_us {}", + "{} Latency[{}] \n tgid: {} \n process_name: {} \n address_family: {} \n delta(us): {} \n src_address_v4: {} \n dst_address_v4: {} \n src_address_v6: {} \n dst_address_v6: {} \n local_port: {} \n remote_port: {} \n timestamp_us: {}\n", "=====>".blue().bold(), i, metric.tgid, @@ -179,7 +190,7 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> { format!("{:?}", metric.dst_address_v6), metric.local_port, metric.remote_port, - metric.timestamp_us + converted_timestamp ); } } @@ -196,7 +207,7 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> { } } } - Err(e) =>{ + Err(e) => { println!( "{} {}", "=====>".blue().bold(), @@ -220,10 +231,18 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> { Ok(response) => { let resp = response.into_inner(); if resp.metrics.is_empty() { - println!("{} No dropped packets metrics found", "=====>".blue().bold()); + println!( + "{} No dropped packets metrics found", + "=====>".blue().bold() + ); } else { - println!("{} Found {} dropped packets metrics", "=====>".blue().bold(), resp.metrics.len()); + println!( + "{} Found {} dropped packets metrics", + "=====>".blue().bold(), + resp.metrics.len() + ); for (i, metric) in resp.metrics.iter().enumerate() { + let converted_timestamp= convert_timestamp_to_date(metric.timestamp_us); println!( "{} DroppedPackets[{}]\n TGID: {}\n Process: {}\n SK Drops: {}\n Socket Errors: {}\n Soft Errors: {}\n Backlog Length: {}\n Write Memory Queued: {}\n Receive Buffer Size: {}\n ACK Backlog: {}\n Timestamp: {} µs", "=====>".blue().bold(), @@ -237,7 +256,7 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> { metric.sk_wmem_queued, metric.sk_rcvbuf, metric.sk_ack_backlog, - metric.timestamp_us + converted_timestamp ); } } @@ -254,7 +273,7 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> { } } } - Err(e) =>{ + Err(e) => { println!( "{} {}", "=====>".blue().bold(), @@ -264,4 +283,9 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> { } } Ok(()) -} \ No newline at end of file +} + +fn convert_timestamp_to_date(timestamp:u64)->String{ + let datetime = DateTime::from_timestamp_micros(timestamp as i64).unwrap(); + datetime.to_string() +} diff --git a/core/src/components/identity/Cargo.toml b/core/src/components/identity/Cargo.toml index 6d9703e..54e17b7 100644 --- a/core/src/components/identity/Cargo.toml +++ b/core/src/components/identity/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cortexflow_identity" -version = "0.1.1-beta.1" +version = "0.1.1-beta.2" edition = "2024" description = "CortexFlow identity service package" license = "Apache-2.0" @@ -10,23 +10,31 @@ homepage = "https://docs.cortexflow.org" repository = "https://github.com/CortexFlow/CortexBrain" [features] -default = ["map-handlers","struct","enums"] +default = ["map-handlers", "struct", "enums"] map-handlers = [] struct = [] enums = [] +experimental = ["map-handlers", "struct", "enums"] [dependencies] aya = "0.13.1" bytes = "1.4" -tokio = { version = "1.48.0", features = ["rt","rt-multi-thread","fs","signal","fs","time","macros"] } +tokio = { version = "1.48.0", features = [ + "rt", + "rt-multi-thread", + "signal", + "fs", + "time", + "macros", +] } anyhow = "1.0" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } libc = "0.2.172" -bytemuck = {version ="1.23.0",features = ["derive"]} +bytemuck = { version = "1.23.0", features = ["derive"] } bytemuck_derive = "1.10.1" cortexbrain-common = { path = "../../../common" } nix = { version = "0.30.1", features = ["net"] } -kube = {version = "2.0.1",features = ["client"]} -k8s-openapi = {version ="0.26.0", features = ["v1_34"]} +kube = { version = "2.0.1", features = ["client"] } +k8s-openapi = { version = "0.26.0", features = ["v1_34"] } diff --git a/core/src/components/identity/src/helpers.rs b/core/src/components/identity/src/helpers.rs index 5e236e3..7855edc 100644 --- a/core/src/components/identity/src/helpers.rs +++ b/core/src/components/identity/src/helpers.rs @@ -9,18 +9,14 @@ use aya::{ programs::{SchedClassifier, TcAttachType}, }; use bytes::BytesMut; -use cortexbrain_common::constants; use k8s_openapi::api::core::v1::Pod; use kube::api::ObjectList; use kube::{Api, Client}; use nix::net::if_::if_nameindex; use std::collections::HashMap; use std::fs; -use std::hash::Hash; -use std::path::PathBuf; use std::result::Result::Ok; use std::sync::Mutex; -use std::time::Duration; use std::{ borrow::BorrowMut, net::Ipv4Addr, @@ -31,7 +27,6 @@ use std::{ }; use tokio::time; use tracing::{debug, error, info, warn}; -use tracing_subscriber::fmt::format; /* * TryFrom Trait implementation for IpProtocols enum @@ -295,14 +290,6 @@ pub async fn display_tcp_registry_events>( let command_str = String::from_utf8_lossy(&command[..end]).to_string(); let cgroup_id = tcp_pl.cgroup_id; - // construct the parent path - //let proc_path = PathBuf::from("/proc") - // .join(event_id.to_string()) - // .join("cgroup"); - - //let proc_content = fs::read_to_string(&proc_path); - //match proc_content { - // Ok(proc_content) => { match IpProtocols::try_from(tcp_pl.proto) { std::result::Result::Ok(proto) => { info!( @@ -324,13 +311,6 @@ pub async fn display_tcp_registry_events>( ); } }; - //} - //Err(e) => - // eprintln!( - // "An error occured while accessing the content from the {:?} path: {}", - // &proc_path, - // e - // ), } else { warn!("Received packet data too small: {} bytes", data.len()); } @@ -345,6 +325,7 @@ pub async fn display_tcp_registry_events>( } } +#[cfg(feature = "experimental")] pub async fn scan_cgroup_paths(path: String) -> Result, Error> { let mut cgroup_paths: Vec = Vec::new(); let default_path = "/sys/fs/cgroup/kubepods.slice".to_string(); @@ -380,10 +361,16 @@ pub async fn scan_cgroup_paths(path: String) -> Result, Error> { Ok(cgroup_paths) } +#[cfg(feature = "experimental")] +struct ServiceIdentity { + uid: String, + container_id: String, +} + +#[cfg(feature = "experimental")] pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { let interval = std::time::Duration::from_secs(time_delta); - let mut discovered_pods = HashMap::::new(); - while true { + loop { let scanned_paths = scan_cgroup_paths("/sys/fs/cgroup/kubelet.slice".to_string()) .await .expect("An error occured during the cgroup scan"); @@ -423,6 +410,7 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { match subpaths_v2 { Ok(paths) => { for sub2 in paths { + info!("Debugging sub2: {}", &sub2); //return e.g. /sys/fs/cgroup/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-podb8701d38_3791_422d_ad15_890ad1a0844b.slice/docker-f2e265659293676231ecb38fafccc97b1a42b75be192c32a602bc8ea579dc866.scope scanned_subpaths_v2.push(sub2); // this contains the addressed like this //kubelet-kubepods-besteffort-pod088f8704_24f0_4636_a8e2_13f75646f370.slice @@ -435,21 +423,33 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { } } - //read the subpaths let mut uids = Vec::::new(); + let mut identites = Vec::::new(); + + //read the subpaths to extract the pod uid for subpath in scanned_subpaths_v2 { let uid = extract_pod_uid(subpath.clone()) .expect("An error occured during the extraction of pod UIDs"); + let container_id = extract_container_id(subpath.clone()) + .expect("An error occured during the extraction of the docker container id"); debug!("Debugging extracted UID: {:?}", &uid); - uids.push(uid); + // create a linked list for each service + let service_identity = ServiceIdentity { uid, container_id }; + identites.push(service_identity); //push the linked list in a vector of ServiceIdentity structure. Each struct contains the uid and the container id } + // get pod information from UID and store the info in an HashMqp for O(1) access let service_map = get_pod_info().await?; - for (uid) in uids { - if let Some(name) = service_map.get(&uid) { - info!("UID (from eBPF): {} name:(from K8s): {}", &uid, name); - } + //info!("Debugging Identites vector: {:?}", identites); + for service in identites { + let name = service_cache(service_map.clone(), service.uid.clone()); + let uid = service.uid; + let id = service.container_id; + info!( + "[Identity]: name: {:?} uid: {:?} docker container id {:?} ", + name, uid, id + ); } info!( @@ -458,10 +458,27 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { ); time::sleep(interval).await; } +} +#[cfg(feature = "experimental")] +fn service_cache(service_map: HashMap, uid: String) -> String { + service_map.get(&uid).cloned().unwrap_or_else(|| { + error!("Service not found for uid: {}", uid); + "unknown".to_string() + }) +} +#[cfg(feature = "experimental")] +fn extract_container_id(cgroup_path: String) -> Result { + let splits: Vec<&str> = cgroup_path.split("/").collect(); - Ok(()) + let index = extract_target_from_splits(splits.clone(), "docker-")?; + let docker_id_split = splits[index] + .trim_start_matches("docker-") + .trim_end_matches(".scope"); + Ok(docker_id_split.to_string()) } +// IDEA: add cgroup docker process mapping in ServiceIdentity structure +#[cfg(feature = "experimental")] fn extract_pod_uid(cgroup_path: String) -> Result { // example of cgroup path: // /sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod93580201_87d5_44e6_9779_f6153ca17637.slice @@ -470,12 +487,9 @@ fn extract_pod_uid(cgroup_path: String) -> Result { // split the path by "/" let splits: Vec<&str> = cgroup_path.split("/").collect(); - let mut uid_vec = Vec::::new(); debug!("Debugging splits: {:?}", &splits); - let mut pod_split_vec = Vec::::new(); - - let index = extract_target_from_splits(splits.clone())?; + let index = extract_target_from_splits(splits.clone(), "-pod")?; let pod_split = splits[index] .trim_start_matches("kubelet-kubepods-besteffort-") @@ -490,11 +504,11 @@ fn extract_pod_uid(cgroup_path: String) -> Result { let uid = uid_.replace("_", "-"); Ok(uid.to_string()) } - -fn extract_target_from_splits(splits: Vec<&str>) -> Result { +#[cfg(feature = "experimental")] +fn extract_target_from_splits(splits: Vec<&str>, target: &str) -> Result { for (index, split) in splits.iter().enumerate() { // find the split that contains the word 'pod' - if split.contains("-pod") { + if split.contains(target) { debug!("Target index; {}", index); return Ok(index); } @@ -503,6 +517,7 @@ fn extract_target_from_splits(splits: Vec<&str>) -> Result { } /* unfortunately you cannot query the pods using the uids directly from ListParams */ +#[cfg(feature = "experimental")] async fn query_all_pods() -> Result, Error> { let client = Client::try_default() .await @@ -518,6 +533,7 @@ async fn query_all_pods() -> Result, Error> { } // fast pod caching system +#[cfg(feature = "experimental")] async fn get_pod_info() -> Result, Error> { let all_pods = query_all_pods().await?; @@ -527,15 +543,16 @@ async fn get_pod_info() -> Result, Error> { if let (Some(name), Some(uid)) = (pod.metadata.name, pod.metadata.uid) { service_map.insert(uid, name); } - } + } // insert the pod name and uid from the KubeAPI Ok(service_map) } +#[cfg(feature = "experimental")] mod tests { use tracing_subscriber::fmt::format; - use crate::helpers::{extract_pod_uid, extract_target_from_splits}; + use crate::helpers::{extract_container_id, extract_pod_uid, extract_target_from_splits}; #[test] fn extract_uid_from_string() { @@ -566,12 +583,29 @@ mod tests { let mut index_vec = Vec::::new(); for cgroup_path in cgroup_paths { - let mut splits: Vec<&str> = cgroup_path.split("/").collect(); + let splits: Vec<&str> = cgroup_path.split("/").collect(); - let target_index = extract_target_from_splits(splits).unwrap(); + let target_index = extract_target_from_splits(splits, "-pod").unwrap(); index_vec.push(target_index); } let index_check = vec![6, 7]; assert_eq!(index_vec, index_check); } + + #[test] + fn extract_docker_id() { + let cgroup_paths = vec!["/sys/fs/cgroup/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod17fd3f7c_37e4_4009_8c38_e58b30691af3.slice/docker-13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861.scope".to_string(), + "/sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod17fd3f7c_37e4_4009_8c38_e58b30691af3.slice/docker-13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861.scope".to_string()]; + + let mut id_vec = Vec::::new(); + for cgroup_path in cgroup_paths { + let id = extract_container_id(cgroup_path).unwrap(); + id_vec.push(id); + } + let id_check = vec![ + "13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861".to_string(), + "13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861".to_string(), + ]; + assert_eq!(id_vec, id_check); + } } diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index 1e7ca94..5688715 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -1,13 +1,12 @@ /* * CortexBrain Identity Service - * Open Issues: #105 #107 * Features: - * 1. TCP, UDP , ICMP events tracker - * 2. Track Connections using a PerfEventArray named ConnArray - * 3. Track veth creation and deletion events + * 1. TCP events tracker + * 2. veth creation and deletion tracker + * 3. TC (traffic control) tracker + * 4. [Experimental]: cgroup scanner * */ -#![allow(unused_mut)] #![allow(warnings)] mod enums; @@ -27,8 +26,11 @@ use aya::{ use crate::helpers::{ display_events, display_tcp_registry_events, display_veth_events, get_veth_channels, - scan_cgroup_cronjob, }; + +#[cfg(feature = "experimental")] +use crate::helpers::scan_cgroup_cronjob; + use crate::map_handlers::{init_bpf_maps, map_pinner, populate_blocklist}; use bytes::BytesMut; @@ -321,12 +323,51 @@ async fn event_listener( display_tcp_registry_events(tcp_registry_buffer, tcp_registry_running, tcp_buffers).await; }); + #[cfg(feature = "experimental")] let scan_cgroup_cronjob = tokio::spawn(async move { let _ = scan_cgroup_cronjob(180).await; }); + #[cfg(not(feature = "experimental"))] + tokio::select! { + /* result = scan_cgroup_cronjob=>{ + match result{ + Err(e)=>error!("scan_cgroup_cronjob panicked {:?}",e), + std::result::Result::Ok(_) => info!("cgroup scan cronjob exited"), + } + } */ + result = veth_events_displayer=>{ + match result{ + Err(e)=>error!("veth_event_displayer panicked {:?}",e), + std::result::Result::Ok(_) => info!("Found new veth_event"), + } + } + + result = net_events_displayer=>{ + match result{ + Err(e)=>error!("net_event_displayer panicked {:?}",e), + std::result::Result::Ok(_) => info!("Found new net_event"), + } + } + + result = tcp_registry_events_displayer => { + match result{ + Err(e)=>error!("tcp_registry_events_displayer panicked {:?}",e), + std::result::Result::Ok(_)=>info!("Found new tcp_register event") + } + } + + _= signal::ctrl_c()=>{ + info!("Triggered Exiting..."); + veth_running_signal.store(false, Ordering::SeqCst); + net_events_running_signal.store(false, Ordering::SeqCst); + tcp_registry_running_signal.store(false, Ordering::SeqCst); + } + + } + #[cfg(feature = "experimental")] tokio::select! { - result = scan_cgroup_cronjob=>{ + result = scan_cgroup_cronjob=>{ match result{ Err(e)=>error!("scan_cgroup_cronjob panicked {:?}",e), std::result::Result::Ok(_) => info!("cgroup scan cronjob exited"), diff --git a/core/src/testing/agent.yaml b/core/src/testing/agent.yaml index c31e7b0..e5c54f0 100644 --- a/core/src/testing/agent.yaml +++ b/core/src/testing/agent.yaml @@ -19,7 +19,7 @@ spec: hostNetwork: true containers: - name: agent - image: lorenzotettamanti/cortexflow-agent:0.1.1-beta.1 + image: lorenzotettamanti/cortexflow-agent:latest command: ["/bin/bash", "-c"] args: - | diff --git a/core/src/testing/identity.yaml b/core/src/testing/identity.yaml index 675f60d..44fc5b9 100644 --- a/core/src/testing/identity.yaml +++ b/core/src/testing/identity.yaml @@ -53,7 +53,7 @@ spec: - SYS_PTRACE containers: - name: identity - image: lorenzotettamanti/cortexflow-identity:0.1.1-cgroup_scannerv_exp + image: lorenzotettamanti/cortexflow-identity:latest command: ["/bin/bash", "-c"] args: - |