From 301be351b074639f742981beed10ec0a91e8f33e Mon Sep 17 00:00:00 2001 From: LorenzoTettamanti Date: Fri, 16 Jan 2026 10:54:24 +0100 Subject: [PATCH 1/6] [#158]: moved map_handlers module from identity crate to common crate --- core/Cargo.lock | 6 +- core/common/Cargo.toml | 6 ++ core/common/src/lib.rs | 3 +- .../identity => common}/src/map_handlers.rs | 96 +++++++++++-------- core/src/components/identity/src/lib.rs | 3 +- 5 files changed, 68 insertions(+), 46 deletions(-) rename core/{src/components/identity => common}/src/map_handlers.rs (56%) diff --git a/core/Cargo.lock b/core/Cargo.lock index 506d5dc4..e980659b 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -406,6 +406,9 @@ name = "cortexbrain-common" version = "0.1.0" dependencies = [ "anyhow", + "aya", + "k8s-openapi", + "kube", "tracing", "tracing-subscriber", ] @@ -453,10 +456,9 @@ dependencies = [ "bytemuck", "bytemuck_derive", "bytes", - "cortexbrain-common 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cortexbrain-common 0.1.0", "k8s-openapi", "kube", - "libc", "nix", "tokio", "tracing", diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 70545781..ac87f689 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -13,3 +13,9 @@ repository = "https://github.com/CortexFlow/CortexBrain" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } anyhow = "1.0" +kube = { version = "2.0.1", features = ["client"] } +k8s-openapi = { version = "0.26.0", features = ["v1_34"] } +aya = "0.13.1" + +[features] +map-handlers = [] diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index f8fadc66..2f8e5635 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -1,3 +1,4 @@ pub mod constants; +pub mod formatters; pub mod logger; -pub mod formatters; \ No newline at end of file +pub mod map_handlers; diff --git a/core/src/components/identity/src/map_handlers.rs b/core/common/src/map_handlers.rs similarity index 56% rename from core/src/components/identity/src/map_handlers.rs rename to core/common/src/map_handlers.rs index a225a470..43330fab 100644 --- a/core/src/components/identity/src/map_handlers.rs +++ b/core/common/src/map_handlers.rs @@ -13,39 +13,49 @@ use std::sync::Mutex; use tracing::warn; use tracing::{error, info}; -pub fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map, Map, Map), anyhow::Error> { - // this function init the bpfs maps used in the main program - /* - index 0: events_map - index 1: veth_map - index 2: blocklist map - */ - let mut bpf_new = bpf.lock().unwrap(); +// docs +// +// this function init the bpfs maps used in the main program +// +// index 0: events_map +// index 1: veth_map +// index 2: blocklist map +// index 3: tcp_registry map +// - let events_map = bpf_new - .take_map("EventsMap") - .ok_or_else(|| anyhow::anyhow!("EventsMap map not found"))?; - - let veth_map = bpf_new - .take_map("veth_identity_map") - .ok_or_else(|| anyhow::anyhow!("veth_identity_map map not found"))?; - - let blocklist_map = bpf_new - .take_map("Blocklist") - .ok_or_else(|| anyhow::anyhow!("Blocklist map not found"))?; +#[cfg(feature = "map-handlers")] +pub struct BpfMapsData { + pub bpf_obj_names: Vec, + pub bpf_obj_map: Vec, +} - let tcp_registry_map = bpf_new - .take_map("TcpPacketRegistry") - .ok_or_else(|| anyhow::anyhow!("TcpPacketRegistry map not found"))?; +#[cfg(feature = "map-handlers")] +pub fn init_bpf_maps( + bpf: Arc>, + map_names: Vec, +) -> Result { + let mut bpf_new = bpf.lock().expect("Cannot get value from lock"); + let mut maps = Vec::new(); // stores bpf_maps_objects - Ok((events_map, veth_map, blocklist_map, tcp_registry_map)) + for name in &map_names { + let bpf_map_init = bpf_new + .take_map(&name) + .ok_or_else(|| anyhow::anyhow!("{} map not found", &name))?; + maps.push(bpf_map_init); + } + Ok(BpfMapsData { + bpf_obj_names: map_names.clone(), + bpf_obj_map: maps, + }) } //TODO: save bpf maps path in the cli metadata + //takes an array of bpf maps and pin them to persiste session data -//TODO: change maps type with a Vec instead of (Map,Map). This method is only for fast development and it's not optimized -//TODO: add bpf mounts during cli installation -pub fn map_pinner(maps: &(Map, Map, Map, Map), path: &PathBuf) -> Result<(), Error> { +// FIXME: is this ok that we are returning a BpfMapsData? + +#[cfg(feature = "map-handlers")] +pub fn map_pinner(maps: BpfMapsData, path: &PathBuf) -> Result, Error> { if !path.exists() { info!("Pin path {:?} does not exist. Creating it...", path); std::fs::create_dir_all(&path)?; @@ -56,28 +66,32 @@ pub fn map_pinner(maps: &(Map, Map, Map, Map), path: &PathBuf) -> Result<(), Err } } - let configs = [ - (&maps.0, "events_map"), - (&maps.1, "veth_map"), - (&maps.2, "blocklist_map"), - (&maps.3, "tcp_packet_registry"), - ]; - - for (name, paths) in configs { - let map_path = path.join(paths); + let mut owned_maps = Vec::new(); // aya::Maps does not implement the clone trait i need to create a raw copy of the vec map + // an iterator that iterates two iterators simultaneously + for (map_obj, name) in maps + .bpf_obj_map + .into_iter() + .zip(maps.bpf_obj_names.into_iter()) + { + let map_path = path.join(&name); if map_path.exists() { - warn!("Path {} already exists", paths); - warn!("Removing path {}", paths); - let _ = std::fs::remove_file(&map_path); + warn!("Path {} already exists", name); + warn!("Removing path {}", name); + std::fs::remove_file(&map_path)?; } info!("Trying to pin map {:?} in map path: {:?}", name, &map_path); - name.pin(&map_path)?; + map_obj.pin(&map_path)?; + owned_maps.push(map_obj); } - Ok(()) + Ok(owned_maps) } + +#[cfg(feature = "map-handlers")] pub async fn populate_blocklist(map: &mut Map) -> Result<(), Error> { - let client = Client::try_default().await.unwrap(); + let client = Client::try_default() + .await + .expect("Cannot connect to Kubernetes Client"); let namespace = "cortexflow"; let configmap = "cortexbrain-client-config"; diff --git a/core/src/components/identity/src/lib.rs b/core/src/components/identity/src/lib.rs index e3bb59e0..54134144 100644 --- a/core/src/components/identity/src/lib.rs +++ b/core/src/components/identity/src/lib.rs @@ -1,4 +1,3 @@ pub mod helpers; pub mod structs; -pub mod enums; -pub mod map_handlers; \ No newline at end of file +pub mod enums; \ No newline at end of file From 1385bcfcb09dbf94a5994b804b03b064a27a425a Mon Sep 17 00:00:00 2001 From: LorenzoTettamanti Date: Fri, 16 Jan 2026 10:54:52 +0100 Subject: [PATCH 2/6] [#158]: simplified identity logic. removed duplicated code and functions --- core/src/components/identity/Cargo.toml | 8 +- core/src/components/identity/src/helpers.rs | 12 +- core/src/components/identity/src/main.rs | 195 +++++++++----------- core/src/components/identity/src/mod.rs | 3 +- core/src/components/identity/src/structs.rs | 26 +-- 5 files changed, 112 insertions(+), 132 deletions(-) diff --git a/core/src/components/identity/Cargo.toml b/core/src/components/identity/Cargo.toml index 08d753eb..3146991c 100644 --- a/core/src/components/identity/Cargo.toml +++ b/core/src/components/identity/Cargo.toml @@ -10,11 +10,10 @@ homepage = "https://docs.cortexflow.org" repository = "https://github.com/CortexFlow/CortexBrain" [features] -default = ["map-handlers", "struct", "enums"] -map-handlers = [] +default = ["struct", "enums"] struct = [] enums = [] -experimental = ["map-handlers", "struct", "enums"] +experimental = ["struct", "enums"] [dependencies] @@ -31,10 +30,9 @@ tokio = { version = "1.48.0", features = [ anyhow = "1.0" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } -libc = "0.2.172" bytemuck = { version = "1.23.0", features = ["derive"] } bytemuck_derive = "1.10.1" -cortexbrain-common = "0.1.0" +cortexbrain-common = { path = "../../../common/", features = ["map-handlers"] } nix = { version = "0.30.1", features = ["net"] } kube = { version = "2.0.1", features = ["client"] } k8s-openapi = { version = "0.26.0", features = ["v1_34"] } diff --git a/core/src/components/identity/src/helpers.rs b/core/src/components/identity/src/helpers.rs index 7855edc4..05b96032 100644 --- a/core/src/components/identity/src/helpers.rs +++ b/core/src/components/identity/src/helpers.rs @@ -49,10 +49,10 @@ impl TryFrom for IpProtocols { /* helper functions to read and log net events in the container */ pub async fn display_events>( mut perf_buffers: Vec>, - running: Arc, + //running: Arc, mut buffers: Vec, ) { - while running.load(Ordering::SeqCst) { + while true { for buf in perf_buffers.iter_mut() { match buf.read_events(&mut buffers) { std::result::Result::Ok(events) => { @@ -105,11 +105,11 @@ pub fn reverse_be_addr(addr: u32) -> Ipv4Addr { pub async fn display_veth_events>( bpf: Arc>, mut perf_buffers: Vec>, - running: Arc, + //running: Arc, mut buffers: Vec, mut link_ids: Arc>>, ) { - while running.load(Ordering::SeqCst) { + while true { for buf in perf_buffers.iter_mut() { match buf.read_events(&mut buffers) { std::result::Result::Ok(events) => { @@ -265,10 +265,10 @@ async fn attach_detach_veth( /* helper functions to display events from the TcpPacketRegistry structure */ pub async fn display_tcp_registry_events>( mut perf_buffers: Vec>, - running: Arc, + //running: Arc, mut buffers: Vec, ) { - while running.load(Ordering::SeqCst) { + while true { for buf in perf_buffers.iter_mut() { match buf.read_events(&mut buffers) { std::result::Result::Ok(events) => { diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index 56887158..9dd6ce94 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -7,46 +7,36 @@ * 4. [Experimental]: cgroup scanner * */ -#![allow(warnings)] mod enums; mod helpers; -mod map_handlers; mod structs; +use crate::helpers::{ + display_events, display_tcp_registry_events, display_veth_events, get_veth_channels, +}; use aya::{ Ebpf, - maps::{ - Map, MapData, - perf::{PerfEventArray, PerfEventArrayBuffer}, - }, + maps::{Map, perf::PerfEventArray}, programs::{KProbe, SchedClassifier, TcAttachType, tc::SchedClassifierLinkId}, util::online_cpus, }; -use crate::helpers::{ - display_events, display_tcp_registry_events, display_veth_events, get_veth_channels, -}; - #[cfg(feature = "experimental")] use crate::helpers::scan_cgroup_cronjob; -use crate::map_handlers::{init_bpf_maps, map_pinner, populate_blocklist}; - use bytes::BytesMut; +use cortexbrain_common::map_handlers::{init_bpf_maps, map_pinner, populate_blocklist}; use std::{ convert::TryInto, path::Path, - sync::{ - Arc, Mutex, - atomic::{AtomicBool, Ordering}, - }, + sync::{Arc, Mutex}, }; use anyhow::{Context, Ok}; use cortexbrain_common::{constants, logger}; use tokio::{fs, signal}; -use tracing::{error, info}; +use tracing::{debug, error, info}; use std::collections::HashMap; @@ -72,14 +62,19 @@ async fn main() -> Result<(), anyhow::Error> { let bpf = Arc::new(Mutex::new(Ebpf::load(&data)?)); let bpf_map_save_path = std::env::var(constants::PIN_MAP_PATH) .context("PIN_MAP_PATH environment variable required")?; - - match init_bpf_maps(bpf.clone()) { - std::result::Result::Ok(mut bpf_maps) => { + let data = vec![ + "EventsMap".to_string(), + "veth_identity_map".to_string(), + //"Blocklist".to_string(), + "TcpPacketRegistry".to_string(), + ]; + match init_bpf_maps(bpf.clone(), data) { + std::result::Result::Ok(bpf_maps) => { info!("Successfully loaded bpf maps"); let pin_path = std::path::PathBuf::from(&bpf_map_save_path); info!("About to call map_pinner with path: {:?}", pin_path); - match map_pinner(&bpf_maps, &pin_path) { - std::result::Result::Ok(_) => { + match map_pinner(bpf_maps, &pin_path) { + std::result::Result::Ok(maps) => { info!("maps pinned successfully"); //load veth_trace program ref veth_trace.rs { @@ -90,9 +85,9 @@ async fn main() -> Result<(), anyhow::Error> { info!("Found interfaces: {:?}", interfaces); - { - populate_blocklist(&mut bpf_maps.2).await; - } + //{ FIXME: paused for testing the other features + // populate_blocklist(&mut maps.2).await?; + //} { init_tc_classifier(bpf.clone(), interfaces, link_ids.clone()).await.context( @@ -105,9 +100,11 @@ async fn main() -> Result<(), anyhow::Error> { )?; } - event_listener(bpf_maps, link_ids.clone(), bpf.clone()) + event_listener(maps, link_ids.clone(), bpf.clone()) .await - .context("Error initializing event_listener")?; + .map_err(|e| { + anyhow::anyhow!("Error inizializing event_listener. Reason: {}", e) + })?; } Err(e) => { error!("Error while pinning bpf_maps: {}", e); @@ -116,7 +113,7 @@ async fn main() -> Result<(), anyhow::Error> { } Err(e) => { error!("Error while loading bpf maps {}", e); - signal::ctrl_c(); + let _ = signal::ctrl_c().await; } } @@ -132,7 +129,9 @@ async fn init_tc_classifier( //this funtion initialize the tc classifier program info!("Loading programs"); - let mut bpf_new = bpf.lock().unwrap(); + let mut bpf_new = bpf + .lock() + .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; let program: &mut SchedClassifier = bpf_new .program_mut("identity_classifier") @@ -151,7 +150,9 @@ async fn init_tc_classifier( "Program 'identity_classifier' attached to interface {}", interface ); - let mut map = link_ids.lock().unwrap(); + let mut map = link_ids + .lock() + .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; map.insert(interface.clone(), link_id); } Err(e) => error!( @@ -167,7 +168,9 @@ async fn init_tc_classifier( async fn init_veth_tracer(bpf: Arc>) -> Result<(), anyhow::Error> { //this functions init the veth_tracer used to make the InterfacesRegistry - let mut bpf_new = bpf.lock().unwrap(); + let mut bpf_new = bpf + .lock() + .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; //creation tracer let veth_creation_tracer: &mut KProbe = bpf_new @@ -199,7 +202,9 @@ async fn init_veth_tracer(bpf: Arc>) -> Result<(), anyhow::Error> { } async fn init_tcp_registry(bpf: Arc>) -> Result<(), anyhow::Error> { - let mut bpf_new = bpf.lock().unwrap(); + let mut bpf_new = bpf + .lock() + .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; // init tcp registry let tcp_analyzer: &mut KProbe = bpf_new @@ -236,91 +241,81 @@ async fn init_tcp_registry(bpf: Arc>) -> Result<(), anyhow::Error> { Ok(()) } +// this function init the event listener. Listens for veth events (creation/deletion) and network events (pod to pod communications) +// Doc: +// +// perf_net_events_array: contains is associated with the network events stored in the events_map (EventsMap) +// perf_veth_array: contains is associated with the network events stored in the veth_map (veth_identity_map) +// +// async fn event_listener( - bpf_maps: (Map, Map, Map, Map), + bpf_maps: Vec, link_ids: Arc>>, bpf: Arc>, ) -> Result<(), anyhow::Error> { - // this function init the event listener. Listens for veth events (creation/deletion) and network events (pod to pod communications) - /* Doc: - - perf_net_events_array: contains is associated with the network events stored in the events_map (EventsMap) - perf_veth_array: contains is associated with the network events stored in the veth_map (veth_identity_map) - - */ - info!("Preparing perf_buffers and perf_arrays"); //TODO: try to change from PerfEventArray to a RingBuffer data structure - //let m0=bpf_maps[0]; - //let m1 = bpf_maps[1]; - //let mut ring1=RingBuf::try_from(m0)?; - //let mut ring2=RingBuf::try_from(m1)?; - - //TODO:create an helper function that initialize the data structures and the running - // init PerfEventArrays - let mut perf_veth_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.1)?; - let mut perf_net_events_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.0)?; - let mut tcp_registry_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.3)?; - - // init PerfEventArrays buffers - let mut perf_veth_buffer: Vec> = Vec::new(); - let mut perf_net_events_buffer: Vec> = Vec::new(); - let mut tcp_registry_buffer: Vec> = Vec::new(); - - // fill the input buffers - - for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { - let veth_buf: PerfEventArrayBuffer = perf_veth_array.open(cpu_id, None)?; - perf_veth_buffer.push(veth_buf); - } - for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { - let events_buf: PerfEventArrayBuffer = perf_net_events_array.open(cpu_id, None)?; - perf_net_events_buffer.push(events_buf); + + let mut perf_event_arrays = Vec::new(); // contains a vector of PerfEventArrays + let mut event_buffers = Vec::new(); // contains a vector of buffers + + // create the PerfEventArrays and the buffers + for map in bpf_maps { + debug!("Debugging map type:{:?}", map); + let perf_event_array = PerfEventArray::try_from(map).map_err(|e| { + error!("Cannot create perf_event_array for map.Reason: {}", e); + anyhow::anyhow!("Cannot create perf_event_array for map.Reason: {}", e) + })?; + perf_event_arrays.push(perf_event_array); // this is step 1 + let perf_event_array_buffer = Vec::new(); + event_buffers.push(perf_event_array_buffer); //this is step 2 } - for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { - let tcp_registry_buf: PerfEventArrayBuffer = - tcp_registry_array.open(cpu_id, None)?; - tcp_registry_buffer.push(tcp_registry_buf); + + // fill the input buffers with data from the PerfEventArrays + let cpus = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?; + + for (perf_evt_array, perf_evt_array_buffer) in + perf_event_arrays.iter_mut().zip(event_buffers.iter_mut()) + { + for cpu_id in &cpus { + let single_buffer = perf_evt_array.open(*cpu_id, None)?; + perf_evt_array_buffer.push(single_buffer); + } } info!("Listening for events..."); - // init runnings - let veth_running = Arc::new(AtomicBool::new(true)); - let net_events_running = Arc::new(AtomicBool::new(true)); - let tcp_registry_running = Arc::new(AtomicBool::new(true)); + let mut event_buffers = event_buffers.into_iter(); + let perf_veth_buffer = event_buffers + .next() + .expect("Cannot create perf_veth buffer"); + let perf_net_events_buffer = event_buffers + .next() + .expect("Cannot create perf_net_events buffer"); + let tcp_registry_buffer = event_buffers + .next() + .expect("Cannot create tcp_registry buffer"); // init output buffers - let mut veth_buffers = vec![BytesMut::with_capacity(1024); 10]; - let mut events_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; - let mut tcp_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; - - // init running signals - let veth_running_signal = veth_running.clone(); - let net_events_running_signal = net_events_running.clone(); - let tcp_registry_running_signal = tcp_registry_running.clone(); + let veth_buffers = vec![BytesMut::with_capacity(1024); 10]; + let events_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; + let tcp_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; - let veth_link_ids = link_ids.clone(); + // init veth link ids + let veth_link_ids = link_ids; + // spawn async tasks let veth_events_displayer = tokio::spawn(async move { - display_veth_events( - bpf.clone(), - perf_veth_buffer, - veth_running, - veth_buffers, - veth_link_ids, - ) - .await; + display_veth_events(bpf.clone(), perf_veth_buffer, veth_buffers, veth_link_ids).await; }); - // IDEA: Maybe we don't need to display all this events let net_events_displayer = tokio::spawn(async move { - display_events(perf_net_events_buffer, net_events_running, events_buffers).await; + display_events(perf_net_events_buffer, events_buffers).await; }); let tcp_registry_events_displayer: tokio::task::JoinHandle<()> = tokio::spawn(async move { - display_tcp_registry_events(tcp_registry_buffer, tcp_registry_running, tcp_buffers).await; + display_tcp_registry_events(tcp_registry_buffer, tcp_buffers).await; }); #[cfg(feature = "experimental")] @@ -330,12 +325,6 @@ async fn event_listener( #[cfg(not(feature = "experimental"))] tokio::select! { - /* result = scan_cgroup_cronjob=>{ - match result{ - Err(e)=>error!("scan_cgroup_cronjob panicked {:?}",e), - std::result::Result::Ok(_) => info!("cgroup scan cronjob exited"), - } - } */ result = veth_events_displayer=>{ match result{ Err(e)=>error!("veth_event_displayer panicked {:?}",e), @@ -359,9 +348,6 @@ async fn event_listener( _= signal::ctrl_c()=>{ info!("Triggered Exiting..."); - veth_running_signal.store(false, Ordering::SeqCst); - net_events_running_signal.store(false, Ordering::SeqCst); - tcp_registry_running_signal.store(false, Ordering::SeqCst); } } @@ -396,9 +382,6 @@ async fn event_listener( _= signal::ctrl_c()=>{ info!("Triggered Exiting..."); - veth_running_signal.store(false, Ordering::SeqCst); - net_events_running_signal.store(false, Ordering::SeqCst); - tcp_registry_running_signal.store(false, Ordering::SeqCst); } } diff --git a/core/src/components/identity/src/mod.rs b/core/src/components/identity/src/mod.rs index e3bb59e0..54134144 100644 --- a/core/src/components/identity/src/mod.rs +++ b/core/src/components/identity/src/mod.rs @@ -1,4 +1,3 @@ pub mod helpers; pub mod structs; -pub mod enums; -pub mod map_handlers; \ No newline at end of file +pub mod enums; \ No newline at end of file diff --git a/core/src/components/identity/src/structs.rs b/core/src/components/identity/src/structs.rs index d8cff939..7e2aa2b0 100644 --- a/core/src/components/identity/src/structs.rs +++ b/core/src/components/identity/src/structs.rs @@ -19,17 +19,17 @@ unsafe impl aya::Pod for PacketLog {} /* * Connection Array that contains the hash_id associated with an active connection */ -#[repr(C)] -#[derive(Clone, Copy, Zeroable)] -pub struct ConnArray { - pub src_ip: u32, - pub dst_ip: u32, - pub src_port: u16, - pub dst_port: u16, - pub proto: u8, -} +//#[repr(C)] +//#[derive(Clone, Copy, Zeroable)] +//pub struct ConnArray { +// pub src_ip: u32, +// pub dst_ip: u32, +// pub src_port: u16, +// pub dst_port: u16, +// pub proto: u8, +//} -unsafe impl aya::Pod for ConnArray {} +//unsafe impl aya::Pod for ConnArray {} #[repr(C)] #[derive(Clone, Copy)] @@ -44,13 +44,13 @@ pub struct VethLog { #[repr(C)] #[derive(Clone, Copy)] -pub struct TcpPacketRegistry{ +pub struct TcpPacketRegistry { pub proto: u8, pub src_ip: u32, pub dst_ip: u32, pub src_port: u16, pub dst_port: u16, pub pid: u32, - pub command: [u8;16], + pub command: [u8; 16], pub cgroup_id: u64, -} \ No newline at end of file +} From b9edd1dac7a98a8565ce056b110b3187e7d49d08 Mon Sep 17 00:00:00 2001 From: LorenzoTettamanti Date: Thu, 22 Jan 2026 21:07:47 +0100 Subject: [PATCH 3/6] [#158]: added program handlers function in the common crate. Remove duplicated code in metrics module --- core/common/Cargo.toml | 1 + core/common/src/lib.rs | 3 + core/common/src/map_handlers.rs | 4 - core/common/src/program_handlers.rs | 42 +++++++ core/src/components/metrics/Cargo.toml | 16 ++- core/src/components/metrics/src/helpers.rs | 107 +++++++++++------- core/src/components/metrics/src/main.rs | 72 ++++++------ .../components/metrics/src/maps_handlers.rs | 48 -------- core/src/components/metrics/src/mod.rs | 4 +- .../metrics/src/program_handlers.rs | 59 ---------- 10 files changed, 165 insertions(+), 191 deletions(-) create mode 100644 core/common/src/program_handlers.rs delete mode 100644 core/src/components/metrics/src/maps_handlers.rs delete mode 100644 core/src/components/metrics/src/program_handlers.rs diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index ac87f689..854c04e5 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -19,3 +19,4 @@ aya = "0.13.1" [features] map-handlers = [] +program-handlers = [] diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 2f8e5635..1d015a27 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -1,4 +1,7 @@ pub mod constants; pub mod formatters; pub mod logger; +#[cfg(feature = "map-handlers")] pub mod map_handlers; +#[cfg(feature = "program-handlers")] +pub mod program_handlers; \ No newline at end of file diff --git a/core/common/src/map_handlers.rs b/core/common/src/map_handlers.rs index 43330fab..2882d66f 100644 --- a/core/common/src/map_handlers.rs +++ b/core/common/src/map_handlers.rs @@ -17,10 +17,6 @@ use tracing::{error, info}; // // this function init the bpfs maps used in the main program // -// index 0: events_map -// index 1: veth_map -// index 2: blocklist map -// index 3: tcp_registry map // #[cfg(feature = "map-handlers")] diff --git a/core/common/src/program_handlers.rs b/core/common/src/program_handlers.rs new file mode 100644 index 00000000..8832daff --- /dev/null +++ b/core/common/src/program_handlers.rs @@ -0,0 +1,42 @@ +use aya::{Ebpf, programs::KProbe}; +use std::convert::TryInto; +use std::sync::{Arc, Mutex}; +use tracing::{error, info}; + +#[cfg(feature = "program-handlers")] +pub fn load_program( + bpf: Arc>, + program_name: &str, + actual_program: &str, +) -> Result<(), anyhow::Error> { + let mut bpf_new = bpf.lock().expect("Cannot get value from lock"); + + // Load and attach the eBPF programs + let program: &mut KProbe = bpf_new + .program_mut(program_name) + .ok_or_else(|| anyhow::anyhow!("Program {} not found", program_name))? + .try_into() + .map_err(|e| anyhow::anyhow!("Failed to convert program: {:?}", e))?; + + program + .load() + .map_err(|e| anyhow::anyhow!("Cannot load program: {}. Error: {}", &program_name, e))?; + + match program.attach(actual_program, 0) { + Ok(_) => info!("{} program attached successfully", actual_program), + Err(e) => { + error!("Error attaching {} program {:?}", actual_program, e); + return Err(anyhow::anyhow!( + "Failed to attach {}: {:?}", + actual_program, + e + )); + } + }; + + info!( + "eBPF program {} loaded and attached successfully", + program_name + ); + Ok(()) +} diff --git a/core/src/components/metrics/Cargo.toml b/core/src/components/metrics/Cargo.toml index 112872e8..0e88d8c0 100644 --- a/core/src/components/metrics/Cargo.toml +++ b/core/src/components/metrics/Cargo.toml @@ -7,11 +7,21 @@ edition = "2024" aya = "0.13.1" aya-log = "0.2.1" bytes = "1.4" -tokio = { version = "1.48.0", features = ["rt","macros","time","fs","signal","rt-multi-thread"] } +tokio = { version = "1.48.0", features = [ + "rt", + "macros", + "time", + "fs", + "signal", + "rt-multi-thread", +] } anyhow = "1.0" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } libc = "0.2.172" bytemuck = "1.23.0" -cortexbrain-common = { path = "../../../common" } -nix ={version="0.30.1",features=["net"]} +cortexbrain-common = { path = "../../../common", features = [ + "map-handlers", + "program-handlers", +] } +nix = { version = "0.30.1", features = ["net"] } diff --git a/core/src/components/metrics/src/helpers.rs b/core/src/components/metrics/src/helpers.rs index 1b4628e4..f519c7ea 100644 --- a/core/src/components/metrics/src/helpers.rs +++ b/core/src/components/metrics/src/helpers.rs @@ -1,24 +1,23 @@ -use aya::{maps::{ - perf::PerfEventArrayBuffer, Map, MapData, PerfEventArray - }, util::online_cpus}; +use aya::{ + maps::{Map, MapData, PerfEventArray, perf::PerfEventArrayBuffer}, + util::online_cpus, +}; use bytes::BytesMut; -use tokio::signal; -use std::{ - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, }; +use tokio::signal; -use tracing::{error, info}; +use tracing::{debug, error, info}; use crate::structs::NetworkMetrics; use crate::structs::TimeStampMetrics; pub async fn display_metrics_map( mut perf_buffers: Vec>, - running: Arc, // Changed to Arc + running: Arc, // Changed to Arc mut buffers: Vec, ) { info!("Starting metrics event listener..."); @@ -46,10 +45,23 @@ pub async fn display_metrics_map( let sk_receive_buffer_size = net_metrics.sk_receive_buffer_size; info!( "tgid: {}, comm: {}, ts_us: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}", - tgid, comm, ts_us, sk_drop_count, sk_err, sk_err_soft, sk_backlog_len, sk_write_memory_queued, sk_ack_backlog, sk_receive_buffer_size + tgid, + comm, + ts_us, + sk_drop_count, + sk_err, + sk_err_soft, + sk_backlog_len, + sk_write_memory_queued, + sk_ack_backlog, + sk_receive_buffer_size ); } else { - info!("Received data too small: {} bytes, expected: {}", data.len(), std::mem::size_of::()); + info!( + "Received data too small: {} bytes, expected: {}", + data.len(), + std::mem::size_of::() + ); } } } @@ -65,7 +77,7 @@ pub async fn display_metrics_map( pub async fn display_time_stamp_events_map( mut perf_buffers: Vec>, - running: Arc, // Changed to Arc + running: Arc, // Changed to Arc mut buffers: Vec, ) { info!("Starting timestamp event listener..."); @@ -107,48 +119,67 @@ pub async fn display_time_stamp_events_map( info!("Timestamp event listener stopped"); } -pub async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> { +pub async fn event_listener(bpf_maps: Vec) -> Result<(), anyhow::Error> { info!("Getting CPU count..."); - let cpu_count = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?.len(); - info!("CPU count: {}", cpu_count); - + + let mut perf_event_arrays = Vec::new(); // contains a vector of PerfEventArrays + let mut event_buffers = Vec::new(); // contains a vector of buffers + info!("Creating perf buffers..."); - let mut net_perf_buffer: Vec> = Vec::new(); - let mut net_perf_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.0)?; - let mut time_stamp_events_perf_buffer: Vec> = Vec::new(); - let mut time_stamp_events_perf_array: PerfEventArray = - PerfEventArray::try_from(bpf_maps.1)?; - - info!("Opening perf buffers for {} CPUs...", cpu_count); - for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { - let buf: PerfEventArrayBuffer = net_perf_array.open(cpu_id, None)?; - net_perf_buffer.push(buf); + for map in bpf_maps { + debug!("Debugging map type:{:?}", map); + let perf_event_array = PerfEventArray::try_from(map).map_err(|e| { + error!("Cannot create perf_event_array for map.Reason: {}", e); + anyhow::anyhow!("Cannot create perf_event_array for map.Reason: {}", e) + })?; + perf_event_arrays.push(perf_event_array); // this is step 1 + let perf_event_array_buffer = Vec::new(); + event_buffers.push(perf_event_array_buffer); //this is step 2 } - for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { - let buf: PerfEventArrayBuffer = time_stamp_events_perf_array.open(cpu_id, None)?; - time_stamp_events_perf_buffer.push(buf); + + let cpu_count = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?; + + //info!("CPU count: {}", cpu_count); + for (perf_evt_array, perf_evt_array_buffer) in + perf_event_arrays.iter_mut().zip(event_buffers.iter_mut()) + { + for cpu_id in &cpu_count { + let single_buffer = perf_evt_array.open(*cpu_id, None)?; + perf_evt_array_buffer.push(single_buffer); + } } + + //info!("Opening perf buffers for {} CPUs...", cpu_count); info!("Perf buffers created successfully"); + let mut event_buffers = event_buffers.into_iter(); + + let time_stamp_events_perf_buffer = event_buffers.next().expect(""); + let net_perf_buffer = event_buffers.next().expect(""); // Create shared running flags let net_metrics_running = Arc::new(AtomicBool::new(true)); let time_stamp_events_running = Arc::new(AtomicBool::new(true)); - + // Create proper sized buffers - let net_metrics_buffers = vec![BytesMut::with_capacity(1024); cpu_count]; - let time_stamp_events_buffers = vec![BytesMut::with_capacity(1024); cpu_count]; - + let net_metrics_buffers = vec![BytesMut::with_capacity(1024); cpu_count.len()]; + let time_stamp_events_buffers = vec![BytesMut::with_capacity(1024); cpu_count.len()]; + // Clone for the signal handler let net_metrics_running_signal = net_metrics_running.clone(); let time_stamp_events_running_signal = time_stamp_events_running.clone(); - + info!("Starting event listener tasks..."); let metrics_map_displayer = tokio::spawn(async move { display_metrics_map(net_perf_buffer, net_metrics_running, net_metrics_buffers).await; }); let time_stamp_events_displayer = tokio::spawn(async move { - display_time_stamp_events_map(time_stamp_events_perf_buffer, time_stamp_events_running, time_stamp_events_buffers).await + display_time_stamp_events_map( + time_stamp_events_perf_buffer, + time_stamp_events_running, + time_stamp_events_buffers, + ) + .await }); info!("Event listeners started, entering main loop..."); @@ -176,4 +207,4 @@ pub async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> { // return success Ok(()) -} \ No newline at end of file +} diff --git a/core/src/components/metrics/src/main.rs b/core/src/components/metrics/src/main.rs index 6b22a865..9648e8a2 100644 --- a/core/src/components/metrics/src/main.rs +++ b/core/src/components/metrics/src/main.rs @@ -1,27 +1,18 @@ -use aya::{ - Ebpf -}; - +use anyhow::{Context, Ok}; +use aya::Ebpf; +use cortexbrain_common::{constants, logger}; use std::{ env, fs, path::Path, - sync::{ - Arc, Mutex, - }, + sync::{Arc, Mutex}, }; - -use anyhow::{Context, Ok}; use tracing::{error, info}; -use cortexbrain_common::{constants, logger}; mod helpers; -use crate::{helpers::event_listener, maps_handlers::map_pinner, program_handlers::load_and_attach_tcp_programs}; - -mod maps_handlers; -use crate::maps_handlers::init_ebpf_maps; +use crate::helpers::event_listener; -mod program_handlers; -use crate::program_handlers::load_program; +use cortexbrain_common::map_handlers::{init_bpf_maps, map_pinner}; +use cortexbrain_common::program_handlers::load_program; mod structs; @@ -33,41 +24,50 @@ async fn main() -> Result<(), anyhow::Error> { info!("Starting metrics service..."); info!("fetching data"); - let bpf_path = env::var(constants::BPF_PATH).context("BPF_PATH environment variable required")?; + let bpf_path = + env::var(constants::BPF_PATH).context("BPF_PATH environment variable required")?; let data = fs::read(Path::new(&bpf_path)).context("Failed to load file from path")?; let bpf = Arc::new(Mutex::new(Ebpf::load(&data)?)); let tcp_bpf = bpf.clone(); let tcp_rev_bpf = bpf.clone(); + let tcp_v6_bpf = bpf.clone(); info!("Running Ebpf logger"); info!("loading programs"); - let bpf_map_save_path = - std::env::var(constants::PIN_MAP_PATH).context("PIN_MAP_PATH environment variable required")?; + let bpf_map_save_path = std::env::var(constants::PIN_MAP_PATH) + .context("PIN_MAP_PATH environment variable required")?; - match init_ebpf_maps(bpf.clone()) { - std::result::Result::Ok(maps) => { + let map_data = vec!["time_stamp_events".to_string(), "net_metrics".to_string()]; + + match init_bpf_maps(bpf.clone(), map_data) { + std::result::Result::Ok(bpf_maps) => { info!("BPF maps loaded successfully"); let pin_path = std::path::PathBuf::from(&bpf_map_save_path); info!("About to call map_pinner with path: {:?}", pin_path); - match map_pinner(&maps, &pin_path).await { - std::result::Result::Ok(_) => { + match map_pinner(bpf_maps, &pin_path) { + std::result::Result::Ok(maps) => { info!("BPF maps pinned successfully to {}", bpf_map_save_path); { load_program(bpf.clone(), "metrics_tracer", "tcp_identify_packet_loss") - .context("An error occured during the execution of load_program function")?; - } - - { - load_and_attach_tcp_programs(tcp_bpf.clone()) - .context("An error occured during the execution of load_and_attach_tcp_programs function")?; + .context( + "An error occured during the execution of load_program function", + )?; + + load_program(tcp_bpf,"tcp_connect","tcp_v4_connect") + .context("An error occured during the execution of load_and_attach_tcp_programs function")?; + load_program(tcp_v6_bpf,"tcp_connect","tcp_v6_connect") + .context("An error occured during the execution of load_and_attach_tcp_programs function")?; + + load_program( + tcp_rev_bpf, + "tcp_rcv_state_process", + "tcp_rcv_state_process", + ) + .context( + "An error occured during the execution of load_program function", + )?; } - - { - load_program(tcp_rev_bpf.clone(), "tcp_rcv_state_process", "tcp_rcv_state_process") - .context("An error occured during the execution of load_program function")?; - } - event_listener(maps).await?; } Err(e) => { @@ -83,4 +83,4 @@ async fn main() -> Result<(), anyhow::Error> { } Ok(()) -} \ No newline at end of file +} diff --git a/core/src/components/metrics/src/maps_handlers.rs b/core/src/components/metrics/src/maps_handlers.rs deleted file mode 100644 index 12c3d0a2..00000000 --- a/core/src/components/metrics/src/maps_handlers.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::{path::PathBuf, sync::{Arc, Mutex}}; -use tokio::fs; -use anyhow::Error; -use aya::{maps::Map, Ebpf}; -use tracing::info; - - - -pub fn init_ebpf_maps(bpf: Arc>) -> Result<(Map, Map), anyhow::Error> { - // this function init the bpfs maps used in the main program - /* - index 0: net_metrics - index 1: time_stamp_events - */ - let mut bpf_new = bpf.lock().unwrap(); - - let net_metrics_map = bpf_new - .take_map("net_metrics") - .ok_or_else(|| anyhow::anyhow!("net_metrics map not found"))?; - - let time_stamps_events_map = bpf_new - .take_map("time_stamp_events") - .ok_or_else(|| anyhow::anyhow!("time_stamp_events map not found"))?; - - Ok((net_metrics_map, time_stamps_events_map)) -} - -pub async fn map_pinner(maps: &(Map, Map), path: &PathBuf) -> Result<(), Error> { - // check if the map exists - if !path.exists() { - info!("Pin path {:?} does not exist. Creating it...", path); - fs::create_dir_all(&path).await?; - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755)).await?; - } - } - - let map1_path = path.join("net_metrics"); - let map2_path = path.join("time_stamp_events"); - - // maps pinning - maps.0.pin(&map1_path)?; - maps.1.pin(&map2_path)?; - - Ok(()) -} diff --git a/core/src/components/metrics/src/mod.rs b/core/src/components/metrics/src/mod.rs index 8c4a839a..8414b63d 100644 --- a/core/src/components/metrics/src/mod.rs +++ b/core/src/components/metrics/src/mod.rs @@ -1,5 +1,3 @@ mod structs; mod enums; -mod map_handlers; -mod helpers; -mod program_handlers; \ No newline at end of file +mod helpers; \ No newline at end of file diff --git a/core/src/components/metrics/src/program_handlers.rs b/core/src/components/metrics/src/program_handlers.rs deleted file mode 100644 index 24d18cbd..00000000 --- a/core/src/components/metrics/src/program_handlers.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::sync::{Arc, Mutex}; - -use aya::{programs::KProbe, Ebpf}; -use tracing::{info, error}; -use std::convert::TryInto; - -pub fn load_program(bpf: Arc>, program_name: &str, actual_program: &str) -> Result<(), anyhow::Error> { - let mut bpf_new = bpf.lock().unwrap(); - - // Load and attach the eBPF programs - let program: &mut KProbe = bpf_new - .program_mut(program_name) - .ok_or_else(|| anyhow::anyhow!("Program {} not found", program_name))? - .try_into() - .map_err(|e| anyhow::anyhow!("Failed to convert program: {:?}", e))?; - - program.load()?; - - match program.attach(actual_program, 0) { - Ok(_) => info!("{} program attached successfully", actual_program), - Err(e) => { - error!("Error attaching {} program {:?}", actual_program, e); - return Err(anyhow::anyhow!("Failed to attach {}: {:?}", actual_program, e)); - } - }; - - info!("eBPF program {} loaded and attached successfully", program_name); - Ok(()) -} - -pub fn load_and_attach_tcp_programs(bpf: Arc>) -> Result<(), anyhow::Error> { - let mut bpf_new = bpf.lock().unwrap(); - - // Load and attach the eBPF programs - let tcp_prog: &mut KProbe = bpf_new - .program_mut("tcp_connect") - .ok_or_else(|| anyhow::anyhow!("Program tcp_connect not found"))? - .try_into() - .map_err(|e| anyhow::anyhow!("Failed to convert program tcp_connect: {:?}", e))?; - tcp_prog.load()?; - - match tcp_prog.attach("tcp_v4_connect", 0) { - Ok(_) => info!("tcp_v4_connect program attached successfully"), - Err(e) => { - error!("Error attaching tcp_v4_connect: {:?}", e); - return Err(anyhow::anyhow!("Failed to attach tcp_v4_connect: {:?}", e)); - } - }; - - match tcp_prog.attach("tcp_v6_connect", 0) { - Ok(_) => info!("tcp_v6_connect program attached successfully"), - Err(e) => { - error!("Error attaching tcp_v6_connect: {:?}", e); - return Err(anyhow::anyhow!("Failed to attach tcp_v6_connect: {:?}", e)); - } - }; - - Ok(()) -} \ No newline at end of file From d3a5342c057233eeb369664f50657268fa5458a1 Mon Sep 17 00:00:00 2001 From: LorenzoTettamanti Date: Thu, 22 Jan 2026 23:05:29 +0100 Subject: [PATCH 4/6] [#168]: added load from program-handlers in identity user space implementation. Added a small doc in the conntracker/main.rs file --- core/common/src/program_handlers.rs | 4 +- core/src/components/conntracker/src/main.rs | 24 ++++++-- core/src/components/identity/Cargo.toml | 2 +- core/src/components/identity/src/helpers.rs | 3 + core/src/components/identity/src/main.rs | 67 +++------------------ 5 files changed, 34 insertions(+), 66 deletions(-) diff --git a/core/common/src/program_handlers.rs b/core/common/src/program_handlers.rs index 8832daff..5991befe 100644 --- a/core/common/src/program_handlers.rs +++ b/core/common/src/program_handlers.rs @@ -9,7 +9,9 @@ pub fn load_program( program_name: &str, actual_program: &str, ) -> Result<(), anyhow::Error> { - let mut bpf_new = bpf.lock().expect("Cannot get value from lock"); + let mut bpf_new = bpf + .lock() + .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; // Load and attach the eBPF programs let program: &mut KProbe = bpf_new diff --git a/core/src/components/conntracker/src/main.rs b/core/src/components/conntracker/src/main.rs index 7a12642d..e723e4b4 100644 --- a/core/src/components/conntracker/src/main.rs +++ b/core/src/components/conntracker/src/main.rs @@ -29,14 +29,13 @@ use aya_ebpf::{ }; use crate::tc::try_identity_classifier; -use crate::veth_tracer::try_veth_tracer; use crate::tcp_analyzer::try_tcp_analyzer; - +use crate::veth_tracer::try_veth_tracer; // docs: // // virtual ethernet (veth) interface tracer: -// This function is triggered when a virtual ethernet interface is created +// This function is triggered when a virtual ethernet interface is created // #[kprobe] @@ -50,7 +49,7 @@ pub fn veth_creation_trace(ctx: ProbeContext) -> u32 { // docs: // // virtual ethernet (veth) interface tracer: -// This function is triggered when a virtual ethernet interface is deleted +// This function is triggered when a virtual ethernet interface is deleted // #[kprobe] @@ -94,14 +93,29 @@ pub fn identity_classifier(ctx: TcContext) -> i32 { // // this kprobe retrieves pid data and task id of an incoming packet +// this kprobe separation is needed because every kprobe program can be attached only one time. +// if you try to attach the same program the kernel returns this error: "Program is already attached" +// this is the reason why we have tcp_message_tracer_connect and tcp_message_tracer_rcv that are essentially the same functions +// but in the kernel space one is attached to the tcp_v4_connect kprobe and one to the tcp_v4_rcv kprobe +// TODO: a good addition to the library will be a function that check if the program is already attached: +// if the program is attached it creates a safe copy of the program to attach a second kernel symbol (kprobes) +// if the program is not attached we have the traditional behaviour (load the program + attach the program to the kernel symbol (kprobes)) + #[kprobe] -pub fn tcp_message_tracer(ctx: ProbeContext) -> u32 { +pub fn tcp_message_tracer_connect(ctx: ProbeContext) -> u32 { match try_tcp_analyzer(ctx) { Ok(ret_val) => ret_val, Err(ret_val) => ret_val.try_into().unwrap_or(1), } } +#[kprobe] +pub fn tcp_message_tracer_rcv(ctx: ProbeContext) -> u32 { + match try_tcp_analyzer(ctx) { + Ok(ret_val) => ret_val, + Err(ret_val) => ret_val.try_into().unwrap_or(1), + } +} //ref:https://elixir.bootlin.com/linux/v6.15.1/source/include/uapi/linux/ethtool.h#L536 //https://elixir.bootlin.com/linux/v6.15.1/source/drivers/net/veth.c#L268 diff --git a/core/src/components/identity/Cargo.toml b/core/src/components/identity/Cargo.toml index 3146991c..f5bdb378 100644 --- a/core/src/components/identity/Cargo.toml +++ b/core/src/components/identity/Cargo.toml @@ -32,7 +32,7 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } bytemuck = { version = "1.23.0", features = ["derive"] } bytemuck_derive = "1.10.1" -cortexbrain-common = { path = "../../../common/", features = ["map-handlers"] } +cortexbrain-common = { path = "../../../common/", features = ["map-handlers","program-handlers"] } nix = { version = "0.30.1", features = ["net"] } kube = { version = "2.0.1", features = ["client"] } k8s-openapi = { version = "0.26.0", features = ["v1_34"] } diff --git a/core/src/components/identity/src/helpers.rs b/core/src/components/identity/src/helpers.rs index 05b96032..95127893 100644 --- a/core/src/components/identity/src/helpers.rs +++ b/core/src/components/identity/src/helpers.rs @@ -52,6 +52,7 @@ pub async fn display_events>( //running: Arc, mut buffers: Vec, ) { + // FIXME: here maybe we need to use a loop with tokio::select while true { for buf in perf_buffers.iter_mut() { match buf.read_events(&mut buffers) { @@ -109,6 +110,7 @@ pub async fn display_veth_events>( mut buffers: Vec, mut link_ids: Arc>>, ) { + // FIXME: here maybe we need to use a loop with tokio::select while true { for buf in perf_buffers.iter_mut() { match buf.read_events(&mut buffers) { @@ -268,6 +270,7 @@ pub async fn display_tcp_registry_events>( //running: Arc, mut buffers: Vec, ) { + // FIXME: here maybe we need to use a loop with tokio::select while true { for buf in perf_buffers.iter_mut() { match buf.read_events(&mut buffers) { diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index 9dd6ce94..ac4ed376 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -18,7 +18,7 @@ use crate::helpers::{ use aya::{ Ebpf, maps::{Map, perf::PerfEventArray}, - programs::{KProbe, SchedClassifier, TcAttachType, tc::SchedClassifierLinkId}, + programs::{SchedClassifier, TcAttachType, tc::SchedClassifierLinkId}, util::online_cpus, }; @@ -27,6 +27,7 @@ use crate::helpers::scan_cgroup_cronjob; use bytes::BytesMut; use cortexbrain_common::map_handlers::{init_bpf_maps, map_pinner, populate_blocklist}; +use cortexbrain_common::program_handlers::load_program; use std::{ convert::TryInto, path::Path, @@ -65,7 +66,6 @@ async fn main() -> Result<(), anyhow::Error> { let data = vec![ "EventsMap".to_string(), "veth_identity_map".to_string(), - //"Blocklist".to_string(), "TcpPacketRegistry".to_string(), ]; match init_bpf_maps(bpf.clone(), data) { @@ -167,76 +167,25 @@ async fn init_tc_classifier( async fn init_veth_tracer(bpf: Arc>) -> Result<(), anyhow::Error> { //this functions init the veth_tracer used to make the InterfacesRegistry - - let mut bpf_new = bpf - .lock() - .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; - //creation tracer - let veth_creation_tracer: &mut KProbe = bpf_new - .program_mut("veth_creation_trace") - .ok_or_else(|| anyhow::anyhow!("program 'veth_creation_trace' not found"))? - .try_into()?; - veth_creation_tracer.load()?; - - match veth_creation_tracer.attach("register_netdevice", 0) { - std::result::Result::Ok(_) => info!("veth_creation_tracer program attached successfully"), - Err(e) => error!("Error attaching veth_creation_tracer program {:?}", e), - } - //deletion tracer - let veth_deletion_tracer: &mut KProbe = bpf_new - .program_mut("veth_deletion_trace") - .ok_or_else(|| anyhow::anyhow!("program 'veth_deletion_trace' not found"))? - .try_into()?; - veth_deletion_tracer - .load() - .context("Failed to load deletetion_tracer program")?; + load_program(bpf.clone(), "veth_creation_trace", "register_netdevice")?; - match veth_deletion_tracer.attach("unregister_netdevice_queue", 0) { - std::result::Result::Ok(_) => info!("veth_deletion_trace program attached successfully"), - Err(e) => error!("Error attaching veth_deletetion_trace program {:?}", e), - } + //deletion tracer + load_program(bpf, "veth_deletion_trace", "unregister_netdevice_queue")?; Ok(()) } async fn init_tcp_registry(bpf: Arc>) -> Result<(), anyhow::Error> { - let mut bpf_new = bpf - .lock() - .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; - // init tcp registry - let tcp_analyzer: &mut KProbe = bpf_new - .program_mut("tcp_message_tracer") - .ok_or_else(|| anyhow::anyhow!("program 'tcp_message_tracer' not found"))? - .try_into()?; - tcp_analyzer - .load() - .context("Failed to load tcp_message_tracer")?; + // .clone() increments the reference count of the shared Ebpf instance. + load_program(bpf.clone(), "tcp_message_tracer_rcv", "tcp_v4_rcv")?; info!("initializing tcp tracing functions"); - match tcp_analyzer.attach("tcp_v4_rcv", 0) { - std::result::Result::Ok(_) => { - info!("tcp_message_tracer attached successfully to the tcp_v4_rcv function ") - } - Err(e) => error!( - "Error attaching tcp_message_tracer to the tcp_v4_rcv function. Error: {:?}", - e - ), - } - - match tcp_analyzer.attach("tcp_v4_connect", 0) { - std::result::Result::Ok(_) => { - info!("tcp_message_tracer attached successfully to the tcp_v4_connect function ") - } - Err(e) => error!( - "Error attaching tcp_message_tracer to the tcp_v4_connect function. Error: {:?}", - e - ), - } + load_program(bpf, "tcp_message_tracer_connect", "tcp_v4_connect")?; Ok(()) } From b8449a36f6e2a001ccfe4a2ffb2791d7c890ee4c Mon Sep 17 00:00:00 2001 From: LorenzoTettamanti Date: Fri, 23 Jan 2026 20:24:15 +0100 Subject: [PATCH 5/6] [#158]: fixed typos in the map names --- core/src/components/conntracker/src/data_structures.rs | 2 +- core/src/components/identity/src/main.rs | 2 +- core/src/testing/identity.yaml | 2 +- core/src/testing/metrics.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/components/conntracker/src/data_structures.rs b/core/src/components/conntracker/src/data_structures.rs index 35861a84..4de05cca 100644 --- a/core/src/components/conntracker/src/data_structures.rs +++ b/core/src/components/conntracker/src/data_structures.rs @@ -87,7 +87,7 @@ pub struct TcpPacketRegistry{ // -#[map(name = "EventsMap", pinning = "by_name")] +#[map(name = "events_map", pinning = "by_name")] pub static mut EVENTS: PerfEventArray = PerfEventArray::new(0); // FIXME: this might be useless diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index ac4ed376..b4f6d18a 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -64,7 +64,7 @@ async fn main() -> Result<(), anyhow::Error> { let bpf_map_save_path = std::env::var(constants::PIN_MAP_PATH) .context("PIN_MAP_PATH environment variable required")?; let data = vec![ - "EventsMap".to_string(), + "events_map".to_string(), "veth_identity_map".to_string(), "TcpPacketRegistry".to_string(), ]; diff --git a/core/src/testing/identity.yaml b/core/src/testing/identity.yaml index bb027d2a..1b77d607 100644 --- a/core/src/testing/identity.yaml +++ b/core/src/testing/identity.yaml @@ -53,7 +53,7 @@ spec: - SYS_PTRACE containers: - name: identity - image: ghcr.io/cortexflow/identity:latest + image: lorenzotettamanti/cortexflow-identity:0.1.5-refcount7 command: ["/bin/bash", "-c"] args: - | diff --git a/core/src/testing/metrics.yaml b/core/src/testing/metrics.yaml index 4c775cab..1c1ecf83 100644 --- a/core/src/testing/metrics.yaml +++ b/core/src/testing/metrics.yaml @@ -19,7 +19,7 @@ spec: hostNetwork: true containers: - name: metrics - image: ghcr.io/cortexflow/metrics:latest + image: lorenzotettamanti/cortexflow-metrics:0.1.2-test8 command: ["/bin/bash", "-c"] args: - | From 52cab4c0090930796656ec7da9a576d7ecf366e0 Mon Sep 17 00:00:00 2001 From: LorenzoTettamanti Date: Fri, 23 Jan 2026 22:35:12 +0100 Subject: [PATCH 6/6] [#158]: fixed bpf error: Error: the program is already loaded.Improved map handlers code --- core/common/src/map_handlers.rs | 5 ++++- core/src/components/identity/src/main.rs | 21 ++++++++++--------- core/src/components/metrics/src/helpers.rs | 16 +++++++------- core/src/components/metrics/src/main.rs | 4 ++-- .../src/components/metrics_tracer/src/main.rs | 8 ++++++- core/src/testing/identity.yaml | 2 +- core/src/testing/metrics.yaml | 2 +- 7 files changed, 34 insertions(+), 24 deletions(-) diff --git a/core/common/src/map_handlers.rs b/core/common/src/map_handlers.rs index 2882d66f..2e22736a 100644 --- a/core/common/src/map_handlers.rs +++ b/core/common/src/map_handlers.rs @@ -30,7 +30,10 @@ pub fn init_bpf_maps( bpf: Arc>, map_names: Vec, ) -> Result { - let mut bpf_new = bpf.lock().expect("Cannot get value from lock"); + let mut bpf_new = bpf + .lock() + .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; + let mut maps = Vec::new(); // stores bpf_maps_objects for name in &map_names { diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index b4f6d18a..56f81d68 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -37,7 +37,7 @@ use std::{ use anyhow::{Context, Ok}; use cortexbrain_common::{constants, logger}; use tokio::{fs, signal}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use std::collections::HashMap; @@ -63,12 +63,13 @@ async fn main() -> Result<(), anyhow::Error> { let bpf = Arc::new(Mutex::new(Ebpf::load(&data)?)); let bpf_map_save_path = std::env::var(constants::PIN_MAP_PATH) .context("PIN_MAP_PATH environment variable required")?; - let data = vec![ + let map_data = vec![ "events_map".to_string(), "veth_identity_map".to_string(), "TcpPacketRegistry".to_string(), + "Blocklist".to_string(), ]; - match init_bpf_maps(bpf.clone(), data) { + match init_bpf_maps(bpf.clone(), map_data) { std::result::Result::Ok(bpf_maps) => { info!("Successfully loaded bpf maps"); let pin_path = std::path::PathBuf::from(&bpf_map_save_path); @@ -212,13 +213,13 @@ async fn event_listener( // create the PerfEventArrays and the buffers for map in bpf_maps { debug!("Debugging map type:{:?}", map); - let perf_event_array = PerfEventArray::try_from(map).map_err(|e| { - error!("Cannot create perf_event_array for map.Reason: {}", e); - anyhow::anyhow!("Cannot create perf_event_array for map.Reason: {}", e) - })?; - perf_event_arrays.push(perf_event_array); // this is step 1 - let perf_event_array_buffer = Vec::new(); - event_buffers.push(perf_event_array_buffer); //this is step 2 + if let std::result::Result::Ok(perf_event_array) = PerfEventArray::try_from(map) { + perf_event_arrays.push(perf_event_array); // this is step 1 + let perf_event_array_buffer = Vec::new(); + event_buffers.push(perf_event_array_buffer); //this is step 2 + } else { + warn!("Map is not a PerfEventArray, skipping load"); + } } // fill the input buffers with data from the PerfEventArrays diff --git a/core/src/components/metrics/src/helpers.rs b/core/src/components/metrics/src/helpers.rs index f519c7ea..a67b6074 100644 --- a/core/src/components/metrics/src/helpers.rs +++ b/core/src/components/metrics/src/helpers.rs @@ -10,7 +10,7 @@ use std::sync::{ }; use tokio::signal; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::structs::NetworkMetrics; use crate::structs::TimeStampMetrics; @@ -128,13 +128,13 @@ pub async fn event_listener(bpf_maps: Vec) -> Result<(), anyhow::Error> { info!("Creating perf buffers..."); for map in bpf_maps { debug!("Debugging map type:{:?}", map); - let perf_event_array = PerfEventArray::try_from(map).map_err(|e| { - error!("Cannot create perf_event_array for map.Reason: {}", e); - anyhow::anyhow!("Cannot create perf_event_array for map.Reason: {}", e) - })?; - perf_event_arrays.push(perf_event_array); // this is step 1 - let perf_event_array_buffer = Vec::new(); - event_buffers.push(perf_event_array_buffer); //this is step 2 + if let std::result::Result::Ok(perf_event_array) = PerfEventArray::try_from(map) { + perf_event_arrays.push(perf_event_array); // this is step 1 + let perf_event_array_buffer = Vec::new(); + event_buffers.push(perf_event_array_buffer); //this is step 2 + } else { + warn!("Map is not a PerfEventArray, skipping load"); + } } let cpu_count = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?; diff --git a/core/src/components/metrics/src/main.rs b/core/src/components/metrics/src/main.rs index 9648e8a2..e8677fb9 100644 --- a/core/src/components/metrics/src/main.rs +++ b/core/src/components/metrics/src/main.rs @@ -54,9 +54,9 @@ async fn main() -> Result<(), anyhow::Error> { "An error occured during the execution of load_program function", )?; - load_program(tcp_bpf,"tcp_connect","tcp_v4_connect") + load_program(tcp_bpf,"tcp_v4_connect","tcp_v4_connect") .context("An error occured during the execution of load_and_attach_tcp_programs function")?; - load_program(tcp_v6_bpf,"tcp_connect","tcp_v6_connect") + load_program(tcp_v6_bpf,"tcp_v6_connect","tcp_v6_connect") .context("An error occured during the execution of load_and_attach_tcp_programs function")?; load_program( diff --git a/core/src/components/metrics_tracer/src/main.rs b/core/src/components/metrics_tracer/src/main.rs index 2f5e5a14..216a6aca 100644 --- a/core/src/components/metrics_tracer/src/main.rs +++ b/core/src/components/metrics_tracer/src/main.rs @@ -78,7 +78,13 @@ fn try_metrics_tracer(ctx: ProbeContext) -> Result { // Monitor on tcp_sendmsg, tcp_v4_connect #[kprobe] -fn tcp_connect(ctx: ProbeContext) -> u32 { +fn tcp_v6_connect(ctx: ProbeContext) -> u32 { + match on_connect(ctx) { Ok(_) => 0, Err(e) => e as u32 } +} + +// Monitor on tcp_sendmsg, tcp_v4_connect +#[kprobe] +fn tcp_v4_connect(ctx: ProbeContext) -> u32 { match on_connect(ctx) { Ok(_) => 0, Err(e) => e as u32 } } diff --git a/core/src/testing/identity.yaml b/core/src/testing/identity.yaml index 1b77d607..38bf1978 100644 --- a/core/src/testing/identity.yaml +++ b/core/src/testing/identity.yaml @@ -53,7 +53,7 @@ spec: - SYS_PTRACE containers: - name: identity - image: lorenzotettamanti/cortexflow-identity:0.1.5-refcount7 + image: lorenzotettamanti/cortexflow-identity:0.1.5-refcount9 command: ["/bin/bash", "-c"] args: - | diff --git a/core/src/testing/metrics.yaml b/core/src/testing/metrics.yaml index 1c1ecf83..262b28f7 100644 --- a/core/src/testing/metrics.yaml +++ b/core/src/testing/metrics.yaml @@ -19,7 +19,7 @@ spec: hostNetwork: true containers: - name: metrics - image: lorenzotettamanti/cortexflow-metrics:0.1.2-test8 + image: lorenzotettamanti/cortexflow-metrics:0.1.2-test12 command: ["/bin/bash", "-c"] args: - |