Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ repository = "https://github.com/CortexFlow/CortexBrain"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
anyhow = "1.0"
kube = { version = "2.0.1", features = ["client"] }
k8s-openapi = { version = "0.26.0", features = ["v1_34"] }
aya = "0.13.1"

[features]
map-handlers = []
program-handlers = []
6 changes: 5 additions & 1 deletion core/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
pub mod constants;
pub mod formatters;
pub mod logger;
pub mod formatters;
#[cfg(feature = "map-handlers")]
pub mod map_handlers;
#[cfg(feature = "program-handlers")]
pub mod program_handlers;
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,48 @@ use std::sync::Mutex;
use tracing::warn;
use tracing::{error, info};

pub fn init_bpf_maps(bpf: Arc<Mutex<Ebpf>>) -> Result<(Map, Map, Map, Map), anyhow::Error> {
// this function init the bpfs maps used in the main program
/*
index 0: events_map
index 1: veth_map
index 2: blocklist map
*/
let mut bpf_new = bpf.lock().unwrap();
// docs
//
// this function init the bpfs maps used in the main program
//
//

let events_map = bpf_new
.take_map("EventsMap")
.ok_or_else(|| anyhow::anyhow!("EventsMap map not found"))?;

let veth_map = bpf_new
.take_map("veth_identity_map")
.ok_or_else(|| anyhow::anyhow!("veth_identity_map map not found"))?;
#[cfg(feature = "map-handlers")]
pub struct BpfMapsData {
pub bpf_obj_names: Vec<String>,
pub bpf_obj_map: Vec<Map>,
}

let blocklist_map = bpf_new
.take_map("Blocklist")
.ok_or_else(|| anyhow::anyhow!("Blocklist map not found"))?;
#[cfg(feature = "map-handlers")]
pub fn init_bpf_maps(
bpf: Arc<Mutex<Ebpf>>,
map_names: Vec<String>,
) -> Result<BpfMapsData, anyhow::Error> {
let mut bpf_new = bpf
.lock()
.map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?;

let tcp_registry_map = bpf_new
.take_map("TcpPacketRegistry")
.ok_or_else(|| anyhow::anyhow!("TcpPacketRegistry map not found"))?;
let mut maps = Vec::new(); // stores bpf_maps_objects

Ok((events_map, veth_map, blocklist_map, tcp_registry_map))
for name in &map_names {
let bpf_map_init = bpf_new
.take_map(&name)
.ok_or_else(|| anyhow::anyhow!("{} map not found", &name))?;
maps.push(bpf_map_init);
}
Ok(BpfMapsData {
bpf_obj_names: map_names.clone(),
bpf_obj_map: maps,
})
}

//TODO: save bpf maps path in the cli metadata

//takes an array of bpf maps and pin them to persiste session data
//TODO: change maps type with a Vec<Map> instead of (Map,Map). This method is only for fast development and it's not optimized
//TODO: add bpf mounts during cli installation
pub fn map_pinner(maps: &(Map, Map, Map, Map), path: &PathBuf) -> Result<(), Error> {
// FIXME: is this ok that we are returning a BpfMapsData?

#[cfg(feature = "map-handlers")]
pub fn map_pinner(maps: BpfMapsData, path: &PathBuf) -> Result<Vec<Map>, Error> {
if !path.exists() {
info!("Pin path {:?} does not exist. Creating it...", path);
std::fs::create_dir_all(&path)?;
Expand All @@ -56,28 +65,32 @@ pub fn map_pinner(maps: &(Map, Map, Map, Map), path: &PathBuf) -> Result<(), Err
}
}

let configs = [
(&maps.0, "events_map"),
(&maps.1, "veth_map"),
(&maps.2, "blocklist_map"),
(&maps.3, "tcp_packet_registry"),
];

for (name, paths) in configs {
let map_path = path.join(paths);
let mut owned_maps = Vec::new(); // aya::Maps does not implement the clone trait i need to create a raw copy of the vec map
// an iterator that iterates two iterators simultaneously
for (map_obj, name) in maps
.bpf_obj_map
.into_iter()
.zip(maps.bpf_obj_names.into_iter())
{
let map_path = path.join(&name);
if map_path.exists() {
warn!("Path {} already exists", paths);
warn!("Removing path {}", paths);
let _ = std::fs::remove_file(&map_path);
warn!("Path {} already exists", name);
warn!("Removing path {}", name);
std::fs::remove_file(&map_path)?;
}
info!("Trying to pin map {:?} in map path: {:?}", name, &map_path);
name.pin(&map_path)?;
map_obj.pin(&map_path)?;
owned_maps.push(map_obj);
}

Ok(())
Ok(owned_maps)
}

#[cfg(feature = "map-handlers")]
pub async fn populate_blocklist(map: &mut Map) -> Result<(), Error> {
let client = Client::try_default().await.unwrap();
let client = Client::try_default()
.await
.expect("Cannot connect to Kubernetes Client");
let namespace = "cortexflow";
let configmap = "cortexbrain-client-config";

Expand Down
44 changes: 44 additions & 0 deletions core/common/src/program_handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use aya::{Ebpf, programs::KProbe};
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use tracing::{error, info};

#[cfg(feature = "program-handlers")]
pub fn load_program(
bpf: Arc<Mutex<Ebpf>>,
program_name: &str,
actual_program: &str,
) -> Result<(), anyhow::Error> {
let mut bpf_new = bpf
.lock()
.map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?;

// Load and attach the eBPF programs
let program: &mut KProbe = bpf_new
.program_mut(program_name)
.ok_or_else(|| anyhow::anyhow!("Program {} not found", program_name))?
.try_into()
.map_err(|e| anyhow::anyhow!("Failed to convert program: {:?}", e))?;

program
.load()
.map_err(|e| anyhow::anyhow!("Cannot load program: {}. Error: {}", &program_name, e))?;

match program.attach(actual_program, 0) {
Ok(_) => info!("{} program attached successfully", actual_program),
Err(e) => {
error!("Error attaching {} program {:?}", actual_program, e);
return Err(anyhow::anyhow!(
"Failed to attach {}: {:?}",
actual_program,
e
));
}
};

info!(
"eBPF program {} loaded and attached successfully",
program_name
);
Ok(())
}
2 changes: 1 addition & 1 deletion core/src/components/conntracker/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub struct TcpPacketRegistry{
//


#[map(name = "EventsMap", pinning = "by_name")]
#[map(name = "events_map", pinning = "by_name")]
pub static mut EVENTS: PerfEventArray<PacketLog> = PerfEventArray::new(0);

// FIXME: this might be useless
Expand Down
24 changes: 19 additions & 5 deletions core/src/components/conntracker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ use aya_ebpf::{
};

use crate::tc::try_identity_classifier;
use crate::veth_tracer::try_veth_tracer;
use crate::tcp_analyzer::try_tcp_analyzer;

use crate::veth_tracer::try_veth_tracer;

// docs:
//
// virtual ethernet (veth) interface tracer:
// This function is triggered when a virtual ethernet interface is created
// This function is triggered when a virtual ethernet interface is created
//

#[kprobe]
Expand All @@ -50,7 +49,7 @@ pub fn veth_creation_trace(ctx: ProbeContext) -> u32 {
// docs:
//
// virtual ethernet (veth) interface tracer:
// This function is triggered when a virtual ethernet interface is deleted
// This function is triggered when a virtual ethernet interface is deleted
//

#[kprobe]
Expand Down Expand Up @@ -94,14 +93,29 @@ pub fn identity_classifier(ctx: TcContext) -> i32 {
//
// this kprobe retrieves pid data and task id of an incoming packet

// this kprobe separation is needed because every kprobe program can be attached only one time.
// if you try to attach the same program the kernel returns this error: "Program is already attached"
// this is the reason why we have tcp_message_tracer_connect and tcp_message_tracer_rcv that are essentially the same functions
// but in the kernel space one is attached to the tcp_v4_connect kprobe and one to the tcp_v4_rcv kprobe
// TODO: a good addition to the library will be a function that check if the program is already attached:
// if the program is attached it creates a safe copy of the program to attach a second kernel symbol (kprobes)
// if the program is not attached we have the traditional behaviour (load the program + attach the program to the kernel symbol (kprobes))

#[kprobe]
pub fn tcp_message_tracer(ctx: ProbeContext) -> u32 {
pub fn tcp_message_tracer_connect(ctx: ProbeContext) -> u32 {
match try_tcp_analyzer(ctx) {
Ok(ret_val) => ret_val,
Err(ret_val) => ret_val.try_into().unwrap_or(1),
}
}

#[kprobe]
pub fn tcp_message_tracer_rcv(ctx: ProbeContext) -> u32 {
match try_tcp_analyzer(ctx) {
Ok(ret_val) => ret_val,
Err(ret_val) => ret_val.try_into().unwrap_or(1),
}
}

//ref:https://elixir.bootlin.com/linux/v6.15.1/source/include/uapi/linux/ethtool.h#L536
//https://elixir.bootlin.com/linux/v6.15.1/source/drivers/net/veth.c#L268
Expand Down
8 changes: 3 additions & 5 deletions core/src/components/identity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ homepage = "https://docs.cortexflow.org"
repository = "https://github.com/CortexFlow/CortexBrain"

[features]
default = ["map-handlers", "struct", "enums"]
map-handlers = []
default = ["struct", "enums"]
struct = []
enums = []
experimental = ["map-handlers", "struct", "enums"]
experimental = ["struct", "enums"]


[dependencies]
Expand All @@ -31,10 +30,9 @@ tokio = { version = "1.48.0", features = [
anyhow = "1.0"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
libc = "0.2.172"
bytemuck = { version = "1.23.0", features = ["derive"] }
bytemuck_derive = "1.10.1"
cortexbrain-common = "0.1.0"
cortexbrain-common = { path = "../../../common/", features = ["map-handlers","program-handlers"] }
nix = { version = "0.30.1", features = ["net"] }
kube = { version = "2.0.1", features = ["client"] }
k8s-openapi = { version = "0.26.0", features = ["v1_34"] }
15 changes: 9 additions & 6 deletions core/src/components/identity/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ impl TryFrom<u8> for IpProtocols {
/* helper functions to read and log net events in the container */
pub async fn display_events<T: BorrowMut<MapData>>(
mut perf_buffers: Vec<PerfEventArrayBuffer<T>>,
running: Arc<AtomicBool>,
//running: Arc<AtomicBool>,
mut buffers: Vec<BytesMut>,
) {
while running.load(Ordering::SeqCst) {
// FIXME: here maybe we need to use a loop with tokio::select
while true {
for buf in perf_buffers.iter_mut() {
match buf.read_events(&mut buffers) {
std::result::Result::Ok(events) => {
Expand Down Expand Up @@ -105,11 +106,12 @@ pub fn reverse_be_addr(addr: u32) -> Ipv4Addr {
pub async fn display_veth_events<T: BorrowMut<MapData>>(
bpf: Arc<Mutex<Bpf>>,
mut perf_buffers: Vec<PerfEventArrayBuffer<T>>,
running: Arc<AtomicBool>,
//running: Arc<AtomicBool>,
mut buffers: Vec<BytesMut>,
mut link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
) {
while running.load(Ordering::SeqCst) {
// FIXME: here maybe we need to use a loop with tokio::select
while true {
for buf in perf_buffers.iter_mut() {
match buf.read_events(&mut buffers) {
std::result::Result::Ok(events) => {
Expand Down Expand Up @@ -265,10 +267,11 @@ async fn attach_detach_veth(
/* helper functions to display events from the TcpPacketRegistry structure */
pub async fn display_tcp_registry_events<T: BorrowMut<MapData>>(
mut perf_buffers: Vec<PerfEventArrayBuffer<T>>,
running: Arc<AtomicBool>,
//running: Arc<AtomicBool>,
mut buffers: Vec<BytesMut>,
) {
while running.load(Ordering::SeqCst) {
// FIXME: here maybe we need to use a loop with tokio::select
while true {
for buf in perf_buffers.iter_mut() {
match buf.read_events(&mut buffers) {
std::result::Result::Ok(events) => {
Expand Down
3 changes: 1 addition & 2 deletions core/src/components/identity/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod helpers;
pub mod structs;
pub mod enums;
pub mod map_handlers;
pub mod enums;
Loading