From c05581662478950d4578d88d28b668590cea5c44 Mon Sep 17 00:00:00 2001 From: Didi Hoffmann Date: Thu, 12 Feb 2026 16:28:46 +0100 Subject: [PATCH 1/3] Small fixes --- src/energy_proc.c | 274 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 202 insertions(+), 72 deletions(-) diff --git a/src/energy_proc.c b/src/energy_proc.c index a12df94..7f1d7fb 100644 --- a/src/energy_proc.c +++ b/src/energy_proc.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -146,6 +147,16 @@ struct pid_metrics { struct rcu_head rcu; }; +struct missing_pid { + u32 tgid; + struct list_head node; +}; + +struct stale_pid { + struct pid_metrics *pm; + struct list_head node; +}; + static const struct rhashtable_params ht_params = { .key_len = sizeof(u32), .key_offset = offsetof(struct pid_metrics, pid), @@ -530,16 +541,71 @@ static u64 energy_model(const struct pid_metrics *e) /* ───────────────── Misc helpers ─────────────────────────────────────── */ -static bool pid_still_alive(u32 pid) +static struct task_struct *get_task_by_tgid(u32 tgid) +{ + struct task_struct *task; + + rcu_read_lock(); + task = pid_task(find_vpid(tgid), PIDTYPE_TGID); + if (task) + get_task_struct(task); + rcu_read_unlock(); + + return task; +} + +static int ensure_pid_entry(u32 tgid) { - bool alive; + struct pid_metrics *pm; + struct task_struct *task; rcu_read_lock(); - struct task_struct *p = pid_task(find_vpid(pid), PIDTYPE_PID); - alive = p && pid_alive(p); + pm = rhashtable_lookup_fast(&pid_ht, &tgid, ht_params); rcu_read_unlock(); + if (pm) + return 0; + + task = get_task_by_tgid(tgid); + if (!task) + return -ESRCH; + + pm = kmem_cache_zalloc(pm_cache, GFP_KERNEL); + if (!pm) { + put_task_struct(task); + return -ENOMEM; + } + + pm->pid = tgid; + get_task_comm(pm->comm, task); + pm->is_kernel = (task->flags & PF_KTHREAD) || !task->mm; + pm->alive = 1; + pm->last_seen = jiffies; + + if (rhashtable_insert_fast(&pid_ht, &pm->node, ht_params)) { + kmem_cache_free(pm_cache, pm); + put_task_struct(task); + return -EEXIST; + } + + if (pmu_supported) { + struct perf_event_attr attr = { + .type = PERF_TYPE_HARDWARE, + .config = PERF_COUNT_HW_INSTRUCTIONS, + .size = sizeof(attr), + .disabled = 1, + .exclude_hv = 1, + .inherit = 1, + }; + + pm->insn_evt = perf_event_create_kernel_counter(&attr, -1, task, NULL, NULL); + if (!IS_ERR(pm->insn_evt)) + perf_event_enable(pm->insn_evt); + else + pm->insn_evt = NULL; + } - return alive; + put_task_struct(task); + return 0; } // We keep this if we want to revert to fixed point representation @@ -644,6 +710,13 @@ static void collect_values(struct work_struct *wk) struct delayed_work *dw = to_delayed_work(wk); struct task_struct *p, *t; + struct rhashtable_iter iter; + struct pid_metrics *it; + struct missing_pid *missing, *missing_tmp; + struct stale_pid *stale, *stale_tmp; + LIST_HEAD(missing_pids); + LIST_HEAD(stale_pids); + LIST_HEAD(removed_pids); rapl_sample_once(); @@ -662,54 +735,65 @@ static void collect_values(struct work_struct *wk) sys_metrics.mem_bytes = sys_mapped_mem_bytes_read(); } - /* iterate tasks */ - for_each_process_thread(p, t) { - - rcu_read_lock(); - u32 tgid = task_tgid_nr(t); + /* discover tasks that do not have a pid entry yet */ + rcu_read_lock(); + for_each_process(p) { + u32 tgid = task_tgid_nr(p); struct pid_metrics *pm; pm = rhashtable_lookup_fast(&pid_ht, &tgid, ht_params); + if (pm) + continue; - if (!pm) { - pm = kmem_cache_zalloc(pm_cache, GFP_KERNEL); - if (!pm) - continue; + missing = kmalloc(sizeof(*missing), GFP_ATOMIC); + if (!missing) + continue; - pm->pid = tgid; - get_task_comm(pm->comm, p); + missing->tgid = tgid; + list_add_tail(&missing->node, &missing_pids); + } + rcu_read_unlock(); - if (rhashtable_insert_fast(&pid_ht, &pm->node, ht_params)) { - kmem_cache_free(pm_cache, pm); - continue; - } + list_for_each_entry_safe(missing, missing_tmp, &missing_pids, node) { + ensure_pid_entry(missing->tgid); + list_del(&missing->node); + kfree(missing); + } - if (pmu_supported) { - struct perf_event_attr attr = { - .type = PERF_TYPE_HARDWARE, - .config = PERF_COUNT_HW_INSTRUCTIONS, - .size = sizeof(attr), - .disabled = 1, - .exclude_hv = 1, - .inherit = 1, - }; - pm->insn_evt = - perf_event_create_kernel_counter( - &attr, -1, p, - NULL, NULL); - if (!IS_ERR(pm->insn_evt)) - perf_event_enable(pm->insn_evt); - else - pm->insn_evt = NULL; - } - pm->is_kernel = (p->flags & PF_KTHREAD) || !p->mm; + /* reset snapshot fields before rebuilding them from cumulative kernel stats */ + rhashtable_walk_enter(&pid_ht, &iter); + rhashtable_walk_start(&iter); + while ((it = rhashtable_walk_next(&iter))) { + if (IS_ERR(it)) { + if (PTR_ERR(it) == -EAGAIN) + continue; + break; } + it->alive = 0; + it->cpu_ns = 0; + it->wakeups = 0; + it->disk_read_bytes = 0; + it->disk_write_bytes = 0; + it->mem_bytes = 0; + } + rhashtable_walk_stop(&iter); + rhashtable_walk_exit(&iter); + + /* aggregate per-thread cumulative counters into process-level totals */ + rcu_read_lock(); + for_each_process_thread(p, t) { + u32 tgid = task_tgid_nr(t); + struct pid_metrics *pm; + + pm = rhashtable_lookup_fast(&pid_ht, &tgid, ht_params); + if (!pm) + continue; /* collect metrics */ pm->alive = 1; pm->last_seen = jiffies; - pm->cpu_ns += t->se.sum_exec_runtime; + pm->cpu_ns += t->se.sum_exec_runtime; #ifdef CONFIG_SCHEDSTATS pm->wakeups += t->stats.nr_wakeups; @@ -724,28 +808,35 @@ static void collect_values(struct work_struct *wk) pm->disk_read_bytes = 0; pm->disk_write_bytes = 0; #endif - - rcu_read_unlock(); } + rcu_read_unlock(); + /* snapshot memory via process leaders */ + rhashtable_walk_enter(&pid_ht, &iter); + rhashtable_walk_start(&iter); + while ((it = rhashtable_walk_next(&iter))) { + struct task_struct *task; + struct mm_struct *mm; - for_each_process(p) { - u32 tgid = task_tgid_nr(p); - struct pid_metrics *pm; - rcu_read_lock(); - pm = rhashtable_lookup_fast(&pid_ht, &tgid, ht_params); - rcu_read_unlock(); - if (!pm) continue; - - struct mm_struct *mm = get_task_mm(p); - if (mm) { pm->mem_bytes = (u64)get_mm_rss(mm) << PAGE_SHIFT; mmput(mm); } - else pm->mem_bytes = 0; - } + if (IS_ERR(it)) { + if (PTR_ERR(it) == -EAGAIN) + continue; + break; + } - /* lazy eviction */ - struct rhashtable_iter iter; - struct pid_metrics *it; + task = get_task_by_tgid(it->pid); + if (!task) + continue; + mm = get_task_mm(task); + if (mm) { + it->mem_bytes = (u64)get_mm_rss(mm) << PAGE_SHIFT; + mmput(mm); + } + put_task_struct(task); + } + rhashtable_walk_stop(&iter); + rhashtable_walk_exit(&iter); /* Instructions: read once from the process’ perf event */ rhashtable_walk_enter(&pid_ht, &iter); @@ -765,7 +856,7 @@ static void collect_values(struct work_struct *wk) rhashtable_walk_stop(&iter); rhashtable_walk_exit(&iter); - + /* evict stale pids that no longer exist */ rhashtable_walk_enter(&pid_ht, &iter); rhashtable_walk_start(&iter); while ((it = rhashtable_walk_next(&iter))) { @@ -774,13 +865,37 @@ static void collect_values(struct work_struct *wk) continue; break; } - if (it && !pid_still_alive(it->pid)){ - it->alive = 0; + if (it && !it->alive) { + stale = kmalloc(sizeof(*stale), GFP_KERNEL); + if (!stale) + continue; + stale->pm = it; + list_add_tail(&stale->node, &stale_pids); } } rhashtable_walk_stop(&iter); rhashtable_walk_exit(&iter); + list_for_each_entry_safe(stale, stale_tmp, &stale_pids, node) { + if (!rhashtable_remove_fast(&pid_ht, &stale->pm->node, ht_params)) + list_move_tail(&stale->node, &removed_pids); + else { + list_del(&stale->node); + kfree(stale); + } + } + + if (!list_empty(&removed_pids)) + synchronize_rcu(); + + list_for_each_entry_safe(stale, stale_tmp, &removed_pids, node) { + if (stale->pm->insn_evt) + perf_event_release_kernel(stale->pm->insn_evt); + kmem_cache_free(pm_cache, stale->pm); + list_del(&stale->node); + kfree(stale); + } + queue_delayed_work(pm_wq, dw, nsecs_to_jiffies(sample_ns)); } @@ -941,12 +1056,13 @@ static int cgroup_energy_show(struct seq_file *m, void *v){ cg_get_name(cgrp, cgname, sizeof(cgname)); //seq_printf(m, "cgroup: %s\n", cgname); - struct task_struct *p, *t; + struct task_struct *p; rcu_read_lock(); - for_each_process_thread(p, t) { - if (task_dfl_cgroup(t) == cgrp){ - struct pid_metrics *pm = rhashtable_lookup_fast(&pid_ht, &t->pid, ht_params); + for_each_process(p) { + if (task_dfl_cgroup(p) == cgrp){ + u32 tgid = task_tgid_nr(p); + struct pid_metrics *pm = rhashtable_lookup_fast(&pid_ht, &tgid, ht_params); if (pm && READ_ONCE(pm->alive)){ print_pm_window(m, pm); } @@ -1017,21 +1133,32 @@ static int __init pidmetrics_init(void) // Debugfs setup that just dumps everything. This is only accessible by root! dir = debugfs_create_dir("energy", NULL); - if (!dir) { - ret = -ENOMEM; + if (IS_ERR_OR_NULL(dir)) { + ret = IS_ERR(dir) ? PTR_ERR(dir) : -ENOMEM; + dir = NULL; goto err_trace; } - if (!debugfs_create_file("all", 0444, dir, NULL, &cnt_fops)) { - ret = -ENOMEM; - goto err_debugfs; + { + struct dentry *entry; + + entry = debugfs_create_file("all", 0444, dir, NULL, &cnt_fops); + if (IS_ERR_OR_NULL(entry)) { + ret = IS_ERR(entry) ? PTR_ERR(entry) : -ENOMEM; + goto err_debugfs; + } } pr_info(DRV_NAME ": created /sys/kernel/debug/energy/all\n"); - if (!debugfs_create_file("sys", 0444, dir, NULL, &sys_only_fops)) { - ret = -ENOMEM; - goto err_debugfs; + { + struct dentry *entry; + + entry = debugfs_create_file("sys", 0444, dir, NULL, &sys_only_fops); + if (IS_ERR_OR_NULL(entry)) { + ret = IS_ERR(entry) ? PTR_ERR(entry) : -ENOMEM; + goto err_debugfs; + } } pr_info(DRV_NAME ": created /sys/kernel/debug/energy/sys\n"); @@ -1104,6 +1231,7 @@ static int __init pidmetrics_init(void) rhashtable_destroy(&pid_ht); err_cache: kmem_cache_destroy(pm_cache); + sys_pmu_exit(); return ret; } @@ -1137,6 +1265,8 @@ static void __exit pidmetrics_exit(void) proc_remove(energy_dir); rhashtable_free_and_destroy(&pid_ht, free_pm, NULL); + rcu_barrier(); + kmem_cache_destroy(pm_cache); pr_info(DRV_NAME ": unloaded. Bye!\n"); } From 70e739d3346e391d46803efff11dd0f086f1857b Mon Sep 17 00:00:00 2001 From: Didi Hoffmann Date: Thu, 12 Feb 2026 18:38:02 +0100 Subject: [PATCH 2/3] non linear models --- README.md | 49 ++-- src/energy_proc.c | 246 ++++++++++++++--- tools/model.py | 653 ++++++++++++++++++++++++++++++++++++---------- 3 files changed, 767 insertions(+), 181 deletions(-) diff --git a/README.md b/README.md index 59152f1..b456545 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ This work has been made possible by the [Prototype Fund](https://www.prototypefu * **Per‑process metrics**: CPU time, RSS memory, disk I/O, network packets, context switch/wakeup counts and (optionally) retired instructions via PMU. * **Energy accounting**: Integrates Intel RAPL MSRs (`PP0`/core and `PSYS`) to compute energy usage in µJ. * **Dynamic sampling interval**: Run‑time adjustable via the `sample_ns` module parameter. -* **Weight‑based energy model**: Each metric has a tunable weight (`w_cpu_ns`, `w_mem_bytes`, …); the weighted sum is exported as `energy=` in each record. +* **Hybrid energy model**: A kernel-safe linear core (`w_*`) plus nonlinear lookup-table corrections for CPU, memory, disk, and network (`nl_*_lut_pmil`), with optional system idle offset (`w_sys_idle_uj`). * **Container aware**: `/proc/energy/cgroup` only shows processes in the caller’s default cgroup; `/proc/energy/all` and `debugfs/.../all` are root‑only. * **Low overhead**: Uses RCU look‑ups, rhashtable and per‑CPU workqueue. Sampling at 100 ms costs <0.3 % CPU on a 16‑core host. * **VM**: Works in VMs but accuracy will drop. @@ -123,7 +123,11 @@ echo 5000000 | sudo tee /sys/module/energy_proc/parameters/sample_ns sudo modprobe -r energy_proc sudo insmod energy_proc.ko w_net_rx_packets=2 ``` -> Each weight is multiplied by its metric and the sum is exposed as `energy=INT.FRAC` where FRAC has three decimal places (kilo‑scaling). +> `energy` is a weighted sum over normalized units: +> `cpu_ns/1e6`, `mem/1MiB`, `instructions/1e6`, `wakeups`, `disk_bytes/4096`, `net_packets`. +> The window model then applies per-resource LUT corrections: +> `nl_cpu_lut_pmil`, `nl_mem_lut_pmil`, `nl_disk_lut_pmil`, `nl_net_lut_pmil` (permille), +> and (for pid 0/system) adds `w_sys_idle_uj`. ## Windowing (decoupled sampling vs reporting) @@ -167,7 +171,7 @@ Field | Meaning It is imporant to not that we use pid 0 as the whole system and not the idle process. ## Data Collection Helper -The repository ships with **`energy-logger.sh`** which periodically dumps `/proc/energy/all` to disk for offline analysis (e.g. weight regression). +The repository ships with **`energy-logger.sh`** which periodically dumps `/sys/kernel/debug/energy/sys` to disk for offline analysis. 1. Pick a longer sampling interval to keep file size manageable: ```bash @@ -180,25 +184,37 @@ The repository ships with **`energy-logger.sh`** which periodically dumps `/proc ## Model -We use a linear model to calculate the energy score for each process. You should train this model yourself by +`tools/model.py` now trains three models from `/sys/kernel/debug/energy/sys` logs against `rapl_psys_sum_uj`: +- linear non-negative ridge baseline (`w_*`) +- nonlinear reference model (hist gradient boosting) +- distilled kernel model (`w_*` + `nl_*_lut_pmil` + `w_sys_idle_uj`) -1) `src` -1) running the data collection helper -``` -echo 100000000 > /sys/module/energy_proc/parameters/sample_ns +1. Collect training data: +```bash +echo 100000000 | sudo tee /sys/module/energy_proc/parameters/sample_ns +cd tools sudo ./energy-logger.sh ``` -1) `python3 -m venv venv` -1) `source venv/bin/activate` -1) `pip install -r requirements.txt` -1) `python3 model.py tmp/energy-XXXX.log` - -You can then add the weights to your kernel module by +2. Train: ```bash -echo 1231232 > /sys/module/energy_proc/parameters/PARAM # 100 ms +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt +python3 model.py /tmp/energy-XXXX.log --mode delta --alpha 10 +``` +3. Apply the printed params: +```bash +echo | sudo tee /sys/module/energy_proc/parameters/w_cpu_ns +echo | sudo tee /sys/module/energy_proc/parameters/w_sys_idle_uj +echo 900,950,1000,1000,1050,1100,1200,1300 | sudo tee /sys/module/energy_proc/parameters/nl_cpu_lut_pmil +echo 1000,1000,1000,1000,1000,1000,1000,1000 | sudo tee /sys/module/energy_proc/parameters/nl_mem_lut_pmil +echo 1000,1000,1000,1000,1000,1000,1000,1000 | sudo tee /sys/module/energy_proc/parameters/nl_disk_lut_pmil +echo 1000,1000,1000,1000,1000,1000,1000,1000 | sudo tee /sys/module/energy_proc/parameters/nl_net_lut_pmil ``` -Please remember that you can not add floats. We use fix decimal values with 3 decimal points. +Tips: +- Use multiple logs from different workloads in one training run for better generalization. +- Include CPU-, memory-, disk-, and network-heavy workloads in your training set so all LUTs are identifiable. ## Test @@ -219,4 +235,3 @@ sudo ./test.sh Patches and 🍻 are welcome! You can either contribute here on GitHub or drop me a message under didi@ribalba.de - diff --git a/src/energy_proc.c b/src/energy_proc.c index 7f1d7fb..cbdb250 100644 --- a/src/energy_proc.c +++ b/src/energy_proc.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,14 @@ #define DRV_NAME "energy_proc" #define PM_LINE_MAX 256 +#define ENERGY_CPU_NS_UNIT 1000000ULL /* 1 ms */ +#define ENERGY_MEM_BYTES_UNIT (1ULL << 20) /* 1 MiB */ +#define ENERGY_INSTR_UNIT 1000000ULL /* 1M instructions */ +#define ENERGY_WAKEUPS_UNIT 1ULL +#define ENERGY_DISK_BYTES_UNIT 4096ULL /* 4 KiB */ +#define ENERGY_NET_PKTS_UNIT 1ULL +#define NL_LUT_BINS 8 + #ifndef CGROUP_NAME_LEN #define CGROUP_NAME_LEN 128 #endif @@ -73,6 +82,19 @@ static u64 w_disk_read_bytes = 0; static u64 w_disk_write_bytes = 0; static u64 w_net_rx_packets = 0; static u64 w_net_tx_packets = 0; +static unsigned long long w_sys_idle_uj = 0; +static unsigned long long nl_cpu_lut_pmil[NL_LUT_BINS] = { + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000 +}; +static unsigned long long nl_mem_lut_pmil[NL_LUT_BINS] = { + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000 +}; +static unsigned long long nl_disk_lut_pmil[NL_LUT_BINS] = { + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000 +}; +static unsigned long long nl_net_lut_pmil[NL_LUT_BINS] = { + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000 +}; #define ENERGY_PARAM(_name) \ module_param(_name, ullong, 0644); \ @@ -86,6 +108,16 @@ ENERGY_PARAM(w_disk_read_bytes) ENERGY_PARAM(w_disk_write_bytes) ENERGY_PARAM(w_net_rx_packets) ENERGY_PARAM(w_net_tx_packets) +ENERGY_PARAM(w_sys_idle_uj) + +module_param_array(nl_cpu_lut_pmil, ullong, NULL, 0644); +MODULE_PARM_DESC(nl_cpu_lut_pmil, "CPU nonlinear per-bin multipliers in permille for window model"); +module_param_array(nl_mem_lut_pmil, ullong, NULL, 0644); +MODULE_PARM_DESC(nl_mem_lut_pmil, "Memory nonlinear per-bin multipliers in permille for window model"); +module_param_array(nl_disk_lut_pmil, ullong, NULL, 0644); +MODULE_PARM_DESC(nl_disk_lut_pmil, "Disk nonlinear per-bin multipliers in permille for window model"); +module_param_array(nl_net_lut_pmil, ullong, NULL, 0644); +MODULE_PARM_DESC(nl_net_lut_pmil, "Network nonlinear per-bin multipliers in permille for window model"); #undef ENERGY_PARAM @@ -139,6 +171,14 @@ struct pid_metrics { u64 window_mem_bytes; u64 window_energy_uj; u64 window_timestamp_ns; + bool window_ready; + u64 prev_cpu_ns; + u64 prev_instructions; + u64 prev_wakeups; + u64 prev_disk_read_bytes; + u64 prev_disk_write_bytes; + u64 prev_net_rx_packets; + u64 prev_net_tx_packets; /* internals */ struct rhash_head node; @@ -523,21 +563,112 @@ static int kp_sock_recvmsg_ret(struct kretprobe_instance *ri, static u64 energy_model(const struct pid_metrics *e) { - + u64 rx_packets = (u64)atomic64_read(&e->net_rx_packets); + u64 tx_packets = (u64)atomic64_read(&e->net_tx_packets); u64 score = 0; - score += e->cpu_ns * w_cpu_ns; - score += e->mem_bytes * w_mem_bytes; - score += (pmu_supported ? e->instructions : 0) * w_instructions; - score += e->wakeups * w_wakeups; - score += e->disk_read_bytes * w_disk_read_bytes; - score += e->disk_write_bytes * w_disk_write_bytes; - score += (u64)atomic64_read(&e->net_rx_packets) * w_net_rx_packets; - score += (u64)atomic64_read(&e->net_tx_packets) * w_net_tx_packets; + /* Legacy cumulative view for debug/all output */ + score += div_u64(e->cpu_ns, ENERGY_CPU_NS_UNIT) * w_cpu_ns; + score += div_u64(e->mem_bytes, ENERGY_MEM_BYTES_UNIT) * w_mem_bytes; + score += div_u64((pmu_supported ? e->instructions : 0), ENERGY_INSTR_UNIT) * w_instructions; + score += div_u64(e->wakeups, ENERGY_WAKEUPS_UNIT) * w_wakeups; + score += div_u64(e->disk_read_bytes, ENERGY_DISK_BYTES_UNIT) * w_disk_read_bytes; + score += div_u64(e->disk_write_bytes, ENERGY_DISK_BYTES_UNIT) * w_disk_write_bytes; + score += div_u64(rx_packets, ENERGY_NET_PKTS_UNIT) * w_net_rx_packets; + score += div_u64(tx_packets, ENERGY_NET_PKTS_UNIT) * w_net_tx_packets; return score; } +static inline u64 u64_delta(u64 now, u64 prev) +{ + return now >= prev ? now - prev : 0; +} + +struct energy_window_parts { + u64 cpu; + u64 mem; + u64 disk; + u64 net; +}; + +static void energy_model_window_parts(const struct pid_metrics *e, struct energy_window_parts *parts) +{ + parts->cpu = 0; + parts->mem = 0; + parts->disk = 0; + parts->net = 0; + + parts->cpu += div_u64(e->window_cpu_ns, ENERGY_CPU_NS_UNIT) * w_cpu_ns; + parts->cpu += div_u64((pmu_supported ? e->window_instructions : 0), ENERGY_INSTR_UNIT) * w_instructions; + parts->cpu += div_u64(e->window_wakeups, ENERGY_WAKEUPS_UNIT) * w_wakeups; + + parts->mem += div_u64(e->window_mem_bytes, ENERGY_MEM_BYTES_UNIT) * w_mem_bytes; + + parts->disk += div_u64(e->window_disk_read_bytes, ENERGY_DISK_BYTES_UNIT) * w_disk_read_bytes; + parts->disk += div_u64(e->window_disk_write_bytes, ENERGY_DISK_BYTES_UNIT) * w_disk_write_bytes; + + parts->net += div_u64(e->window_net_rx_packets, ENERGY_NET_PKTS_UNIT) * w_net_rx_packets; + parts->net += div_u64(e->window_net_tx_packets, ENERGY_NET_PKTS_UNIT) * w_net_tx_packets; +} + +static unsigned int cpu_util_bin(u64 cpu_ns_delta, u64 period_ns) +{ + u64 util_permille; + unsigned int bin; + + if (!period_ns) + return 0; + + util_permille = div_u64(cpu_ns_delta * 1000ULL, period_ns); + if (util_permille > 1000ULL) + util_permille = 1000ULL; + + bin = (unsigned int)div_u64(util_permille * NL_LUT_BINS, 1001ULL); + if (bin >= NL_LUT_BINS) + bin = NL_LUT_BINS - 1; + + return bin; +} + +static unsigned int activity_bin_log2(u64 activity_units) +{ + unsigned int pos; + + if (!activity_units) + return 0; + + pos = fls64(activity_units) - 1; + if (pos >= NL_LUT_BINS) + pos = NL_LUT_BINS - 1; + return pos; +} + +static u64 energy_model_window(const struct pid_metrics *e, bool include_system_idle) +{ + struct energy_window_parts parts; + u64 mem_mib = div_u64(e->window_mem_bytes, ENERGY_MEM_BYTES_UNIT); + u64 disk_4k = div_u64(e->window_disk_read_bytes + e->window_disk_write_bytes, ENERGY_DISK_BYTES_UNIT); + u64 net_pkts = e->window_net_rx_packets + e->window_net_tx_packets; + unsigned int cpu_bin = cpu_util_bin(e->window_cpu_ns, window_ns); + unsigned int mem_bin = activity_bin_log2(mem_mib); + unsigned int disk_bin = activity_bin_log2(disk_4k); + unsigned int net_bin = activity_bin_log2(net_pkts); + u64 scaled = 0; + + energy_model_window_parts(e, &parts); + + scaled += div_u64(parts.cpu * nl_cpu_lut_pmil[cpu_bin], 1000ULL); + scaled += div_u64(parts.mem * nl_mem_lut_pmil[mem_bin], 1000ULL); + scaled += div_u64(parts.disk * nl_disk_lut_pmil[disk_bin], 1000ULL); + scaled += div_u64(parts.net * nl_net_lut_pmil[net_bin], 1000ULL); + + if (include_system_idle) + scaled += w_sys_idle_uj; + + return scaled; +} + /* ───────────────── Misc helpers ─────────────────────────────────────── */ @@ -905,38 +1036,91 @@ static void calculate_window(struct work_struct *wk) struct rhashtable_iter iter; struct pid_metrics *it; + u64 now_ns = ktime_get_ns(); rhashtable_walk_enter(&pid_ht, &iter); rhashtable_walk_start(&iter); - while ((it = rhashtable_walk_next(&iter)) && !IS_ERR(it)){ + while ((it = rhashtable_walk_next(&iter))) { + if (IS_ERR(it)) { + if (PTR_ERR(it) == -EAGAIN) + continue; + break; + } if (it->alive == 1){ - it->window_cpu_ns = it->cpu_ns; - it->window_instructions = it->instructions; - it->window_wakeups = it->wakeups; - it->window_mem_bytes = it->mem_bytes; - it->window_disk_read_bytes = it->disk_read_bytes; - it->window_disk_write_bytes= it->disk_write_bytes; - it->window_net_rx_packets = (u64)atomic64_read(&it->net_rx_packets); - it->window_net_tx_packets = (u64)atomic64_read(&it->net_tx_packets); - - it->window_energy_uj = energy_model(it); - it->window_timestamp_ns = ktime_get_ns(); + u64 cur_rx = (u64)atomic64_read(&it->net_rx_packets); + u64 cur_tx = (u64)atomic64_read(&it->net_tx_packets); + + if (!it->window_ready) { + it->window_cpu_ns = 0; + it->window_instructions = 0; + it->window_wakeups = 0; + it->window_disk_read_bytes = 0; + it->window_disk_write_bytes = 0; + it->window_net_rx_packets = 0; + it->window_net_tx_packets = 0; + it->window_ready = true; + } else { + it->window_cpu_ns = u64_delta(it->cpu_ns, it->prev_cpu_ns); + it->window_instructions = u64_delta(it->instructions, it->prev_instructions); + it->window_wakeups = u64_delta(it->wakeups, it->prev_wakeups); + it->window_disk_read_bytes = u64_delta(it->disk_read_bytes, it->prev_disk_read_bytes); + it->window_disk_write_bytes = u64_delta(it->disk_write_bytes, it->prev_disk_write_bytes); + it->window_net_rx_packets = u64_delta(cur_rx, it->prev_net_rx_packets); + it->window_net_tx_packets = u64_delta(cur_tx, it->prev_net_tx_packets); + } + + it->window_mem_bytes = it->mem_bytes; + + it->prev_cpu_ns = it->cpu_ns; + it->prev_instructions = it->instructions; + it->prev_wakeups = it->wakeups; + it->prev_disk_read_bytes = it->disk_read_bytes; + it->prev_disk_write_bytes = it->disk_write_bytes; + it->prev_net_rx_packets = cur_rx; + it->prev_net_tx_packets = cur_tx; + + it->window_energy_uj = energy_model_window(it, false); + it->window_timestamp_ns = now_ns; } } rhashtable_walk_stop(&iter); rhashtable_walk_exit(&iter); - sys_metrics.window_cpu_ns = sys_metrics.cpu_ns; - sys_metrics.window_instructions = sys_metrics.instructions; - sys_metrics.window_wakeups = sys_metrics.wakeups; - sys_metrics.window_mem_bytes = sys_metrics.mem_bytes; - sys_metrics.window_disk_read_bytes = sys_metrics.disk_read_bytes; - sys_metrics.window_disk_write_bytes = sys_metrics.disk_write_bytes; - sys_metrics.window_net_rx_packets = (u64)atomic64_read(&sys_metrics.net_rx_packets); - sys_metrics.window_net_tx_packets = (u64)atomic64_read(&sys_metrics.net_tx_packets); - sys_metrics.window_energy_uj = energy_model(&sys_metrics); - sys_metrics.window_timestamp_ns = ktime_get_ns(); + { + u64 cur_rx = (u64)atomic64_read(&sys_metrics.net_rx_packets); + u64 cur_tx = (u64)atomic64_read(&sys_metrics.net_tx_packets); + + if (!sys_metrics.window_ready) { + sys_metrics.window_cpu_ns = 0; + sys_metrics.window_instructions = 0; + sys_metrics.window_wakeups = 0; + sys_metrics.window_disk_read_bytes = 0; + sys_metrics.window_disk_write_bytes = 0; + sys_metrics.window_net_rx_packets = 0; + sys_metrics.window_net_tx_packets = 0; + sys_metrics.window_ready = true; + } else { + sys_metrics.window_cpu_ns = u64_delta(sys_metrics.cpu_ns, sys_metrics.prev_cpu_ns); + sys_metrics.window_instructions = u64_delta(sys_metrics.instructions, sys_metrics.prev_instructions); + sys_metrics.window_wakeups = u64_delta(sys_metrics.wakeups, sys_metrics.prev_wakeups); + sys_metrics.window_disk_read_bytes = u64_delta(sys_metrics.disk_read_bytes, sys_metrics.prev_disk_read_bytes); + sys_metrics.window_disk_write_bytes = u64_delta(sys_metrics.disk_write_bytes, sys_metrics.prev_disk_write_bytes); + sys_metrics.window_net_rx_packets = u64_delta(cur_rx, sys_metrics.prev_net_rx_packets); + sys_metrics.window_net_tx_packets = u64_delta(cur_tx, sys_metrics.prev_net_tx_packets); + } + + sys_metrics.window_mem_bytes = sys_metrics.mem_bytes; + sys_metrics.prev_cpu_ns = sys_metrics.cpu_ns; + sys_metrics.prev_instructions = sys_metrics.instructions; + sys_metrics.prev_wakeups = sys_metrics.wakeups; + sys_metrics.prev_disk_read_bytes = sys_metrics.disk_read_bytes; + sys_metrics.prev_disk_write_bytes = sys_metrics.disk_write_bytes; + sys_metrics.prev_net_rx_packets = cur_rx; + sys_metrics.prev_net_tx_packets = cur_tx; + sys_metrics.window_energy_uj = energy_model_window(&sys_metrics, true); + sys_metrics.window_timestamp_ns = now_ns; + } queue_delayed_work(win_wq, dw, nsecs_to_jiffies(window_ns)); } diff --git a/tools/model.py b/tools/model.py index f7b1784..056fb51 100755 --- a/tools/model.py +++ b/tools/model.py @@ -1,147 +1,534 @@ #!/usr/bin/env python3 -from pathlib import Path +from __future__ import annotations + import argparse +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable + import numpy as np import pandas as pd -from sklearn.linear_model import LinearRegression, Ridge -from sklearn.metrics import ( - r2_score, - mean_squared_error, - mean_absolute_error, - mean_absolute_percentage_error, -) -from sklearn.model_selection import train_test_split, KFold, cross_validate -from sklearn.metrics import mean_squared_error - -# ---- reuse helper functions you already have ----------------------------- -from parse_log import ( - parse_monitor_file, - remove_samples, - compute_deltas, -) -# -------------------------------------------------------------------------- - -FEATURES = [ - "cpu_ns", - "mem", - "instructions", - "wakeups", - "diski", - "disko", - "rx", - "tx", +from scipy.optimize import lsq_linear +from sklearn.ensemble import HistGradientBoostingRegressor +from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score + + +NL_LUT_BINS = 8 + + +@dataclass(frozen=True) +class FeatureSpec: + raw_key: str + param_name: str + unit: float + is_counter: bool + + +FEATURES: list[FeatureSpec] = [ + FeatureSpec("cpu_ns", "w_cpu_ns", 1_000_000.0, True), # 1 ms + FeatureSpec("mem", "w_mem_bytes", float(1 << 20), False), # 1 MiB + FeatureSpec("instructions", "w_instructions", 1_000_000.0, True), # 1M instructions + FeatureSpec("wakeups", "w_wakeups", 1.0, True), + FeatureSpec("diski", "w_disk_read_bytes", 4096.0, True), # 4 KiB + FeatureSpec("disko", "w_disk_write_bytes", 4096.0, True), # 4 KiB + FeatureSpec("rx", "w_net_rx_packets", 1.0, True), + FeatureSpec("tx", "w_net_tx_packets", 1.0, True), ] -def aggregate_sample(sample: dict, use_rate: bool) -> dict: - """Collapse one timestamp block into a single feature row.""" - row = {f: 0.0 for f in FEATURES} - for metrics in sample["values"].values(): - for f in FEATURES: - row[f] += metrics[f] +TARGET_COL = "rapl_psys_sum_uj" +FEATURE_COLS = [f.param_name for f in FEATURES] +BASELINE_WEIGHTS = { + "w_cpu_ns": 5.0, + "w_mem_bytes": 0.0, + "w_instructions": 5.0, + "w_wakeups": 0.0, + "w_disk_read_bytes": 0.0, + "w_disk_write_bytes": 0.0, + "w_net_rx_packets": 0.0, + "w_net_tx_packets": 0.0, +} + +IDX = { + "cpu": [0, 2, 3], # cpu_ns + instructions + wakeups + "mem": [1], + "disk": [4, 5], # diski + disko + "net": [6, 7], # rx + tx +} + + +def _parse_numeric(value: str) -> int | str: + try: + return int(value, 10) + except ValueError: + return value + + +def _parse_pid_line(line: str) -> dict[str, int | str] | None: + if not line.startswith("pid="): + return None + + out: dict[str, int | str] = {} + for token in line.split(): + if "=" not in token: + continue + key, value = token.split("=", 1) + out[key] = _parse_numeric(value) + return out + + +def parse_monitor_file(path: Path) -> list[dict]: + blocks: list[dict] = [] + cur: dict = {"pids": {}} + + def flush_current() -> None: + nonlocal cur + if cur.get("timestamp") is not None and cur.get("pids"): + blocks.append(cur) + cur = {"pids": {}} + + with path.open("r", encoding="utf-8") as f: + for raw in f: + line = raw.strip() + if not line: + continue + if line.startswith("-------"): + flush_current() + continue + + pid_row = _parse_pid_line(line) + if pid_row is not None: + if "pid" in pid_row and isinstance(pid_row["pid"], int): + cur["pids"][pid_row["pid"]] = pid_row + continue + + if "=" in line: + key, value = line.split("=", 1) + cur[key] = _parse_numeric(value) + + flush_current() + return blocks + + +def blocks_to_system_df(blocks: list[dict], source: Path) -> pd.DataFrame: + rows: list[dict] = [] + for block in blocks: + pid0 = block.get("pids", {}).get(0) + if not pid0: + continue + if TARGET_COL not in block: + continue - if use_rate and sample.get("sample_ns"): - secs = sample["sample_ns"] / 1e9 - for f in FEATURES: - row[f] /= secs + row = { + "source": str(source), + "timestamp": block.get("timestamp"), + "sample_ns": block.get("sample_ns"), + TARGET_COL: block[TARGET_COL], + } - row["target"] = sample["rapl_psys_sum_uj"] - return row + for spec in FEATURES: + raw_val = pid0.get(spec.raw_key, 0) + if not isinstance(raw_val, int): + raw_val = 0 + row[spec.raw_key] = raw_val + rows.append(row) -def load_dataframe(paths: list[Path], deltas: bool, rate: bool) -> pd.DataFrame: - rows = [] + df = pd.DataFrame(rows) + if df.empty: + return df + return df.sort_values("timestamp").reset_index(drop=True) + + +def cpu_util_permille(cpu_delta_ns: pd.Series, period_ns: pd.Series) -> pd.Series: + denom = period_ns.clip(lower=1) + util = (cpu_delta_ns.astype(np.float64) * 1000.0) / denom.astype(np.float64) + return util.clip(lower=0.0, upper=1000.0) + + +def cpu_util_bin(util_permille: pd.Series) -> pd.Series: + bins = np.floor((util_permille * NL_LUT_BINS) / 1001.0).astype(int) + return bins.clip(lower=0, upper=NL_LUT_BINS - 1) + + +def activity_log2_bin(activity_units: pd.Series) -> pd.Series: + safe = np.maximum(activity_units.astype(np.float64), 1.0) + bins = np.floor(np.log2(safe)).astype(int) + bins[activity_units <= 0] = 0 + return bins.clip(lower=0, upper=NL_LUT_BINS - 1) + + +def build_training_rows( + system_df: pd.DataFrame, + mode: str, + min_target_uj: float, +) -> pd.DataFrame: + if system_df.empty: + return system_df + + if mode != "delta": + raise ValueError("Only --mode delta is supported") + + work = system_df.copy() + counter_cols = [f.raw_key for f in FEATURES if f.is_counter] + + work[f"d_{TARGET_COL}"] = work[TARGET_COL].diff() + for col in counter_cols: + work[f"d_{col}"] = work[col].diff() + + work = work.iloc[1:].copy() + work = work[work[f"d_{TARGET_COL}"] >= min_target_uj] + + for col in counter_cols: + work = work[work[f"d_{col}"] >= 0] + + out = pd.DataFrame(index=work.index) + out["timestamp"] = work["timestamp"] + out["sample_ns"] = work["sample_ns"] + out["target_uj"] = work[f"d_{TARGET_COL}"] + + for spec in FEATURES: + if spec.is_counter: + out[spec.param_name] = work[f"d_{spec.raw_key}"] / spec.unit + else: + out[spec.param_name] = work[spec.raw_key] / spec.unit # always include memory + + out["cpu_util_permille"] = cpu_util_permille(work["d_cpu_ns"], work["sample_ns"]) + out["cpu_bin"] = cpu_util_bin(out["cpu_util_permille"]) + + out["mem_activity"] = out["w_mem_bytes"] + out["disk_activity"] = out["w_disk_read_bytes"] + out["w_disk_write_bytes"] + out["net_activity"] = out["w_net_rx_packets"] + out["w_net_tx_packets"] + + out["mem_bin"] = activity_log2_bin(out["mem_activity"]) + out["disk_bin"] = activity_log2_bin(out["disk_activity"]) + out["net_bin"] = activity_log2_bin(out["net_activity"]) + + out = out.replace([np.inf, -np.inf], np.nan).dropna() + out = out[(out[FEATURE_COLS].sum(axis=1) > 0)] + return out.reset_index(drop=True) + + +def gather_rows(paths: Iterable[Path], mode: str, min_target_uj: float) -> pd.DataFrame: + rows: list[pd.DataFrame] = [] for path in paths: - samples = parse_monitor_file(path) - remove_samples(samples, "alive", True) - #remove_samples(samples, "kernel", False) - if deltas: - compute_deltas(samples) - rows.extend(aggregate_sample(s, rate) for s in samples) - return pd.DataFrame(rows) - -# -------------------------------------------------------------------------- - -def print_metrics(y_true, y_pred, label="test"): - rmse = np.sqrt(mean_squared_error(y_true, y_pred)) - mae = mean_absolute_error(y_true, y_pred) - mape = mean_absolute_percentage_error(y_true, y_pred) - r2 = r2_score(y_true, y_pred) - - print(f"\n# --- {label} metrics ---") - print(f"R² : {r2:7.4f}") - print(f"RMSE : {rmse:10.2f} μJ") - print(f"MAE : {mae:10.2f} μJ") - print(f"MAPE : {mape*100:7.2f} %") - -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("logfiles", nargs="+", type=Path) - ap.add_argument("--deltas", action="store_true", - help="Use per-PID deltas (subtract previous sample)") - ap.add_argument("--rate", action="store_true", - help="Convert counters to per-second rates") - ap.add_argument("--ridge", action="store_true", - help="Use Ridge (L2) instead of plain OLS") - ap.add_argument("--alpha", type=float, default=1.0, - help="Ridge regularisation strength (ignored without --ridge)") - ap.add_argument("--test-frac", type=float, default=0.25, - help="Fraction of data reserved for the hold-out test set") - ap.add_argument("--cv", type=int, default=0, - help="If >0, run k-fold cross-validation instead of single split") - args = ap.parse_args() - - # ----- data ------------------------------------------------------------ - df = load_dataframe(args.logfiles, args.deltas, args.rate).dropna() - X = df[FEATURES].values.astype(np.float64) - y = df["target"].values.astype(np.float64) - - # ----- choose model ---------------------------------------------------- - if args.ridge: - model = Ridge(alpha=args.alpha, fit_intercept=True) - else: - model = LinearRegression(fit_intercept=True) - - # ======= option 1: k-fold CV =========================================== - if args.cv > 1: - cv = KFold(n_splits=args.cv, shuffle=True, random_state=42) - cv_results = cross_validate( - model, - X, - y, - cv=cv, - scoring={ - "r2": "r2", - "neg_rmse": "neg_root_mean_squared_error", - "neg_mae": "neg_mean_absolute_error", - "neg_mape": "neg_mean_absolute_percentage_error", - }, - return_train_score=False, - ) - print("\n# === cross-validation (k = %d) ===" % args.cv) - for metric, vals in cv_results.items(): - if metric.startswith("test_"): - name = metric[5:].lstrip("neg_") - scores = -vals if metric.startswith("test_neg") else vals - print(f"{name.upper():5s}: {scores.mean():.4f} ± {scores.std():.4f}") - # Fit on full data so we can output coefficients below - model.fit(X, y) - - # ======= option 2: single train/test split ============================= + blocks = parse_monitor_file(path) + system_df = blocks_to_system_df(blocks, path) + if system_df.empty: + continue + data = build_training_rows(system_df, mode, min_target_uj) + if not data.empty: + rows.append(data) + + if not rows: + return pd.DataFrame() + return pd.concat(rows, ignore_index=True) + + +def split_time(df: pd.DataFrame, test_frac: float) -> tuple[pd.DataFrame, pd.DataFrame]: + if not 0 < test_frac < 1: + raise ValueError("test-frac must be in (0, 1)") + cut = max(1, int(len(df) * (1.0 - test_frac))) + cut = min(cut, len(df) - 1) + return df.iloc[:cut].copy(), df.iloc[cut:].copy() + + +def trim_train(df: pd.DataFrame, quantile: float) -> pd.DataFrame: + if quantile >= 1.0: + return df + q = df["target_uj"].quantile(quantile) + return df[df["target_uj"] <= q] + + +def fit_nonnegative_ridge(X: np.ndarray, y: np.ndarray, alpha: float) -> np.ndarray: + if alpha < 0: + raise ValueError("alpha must be >= 0") + + if alpha > 0: + reg = np.sqrt(alpha) * np.eye(X.shape[1], dtype=np.float64) + X_aug = np.vstack([X, reg]) + y_aug = np.concatenate([y, np.zeros(X.shape[1], dtype=np.float64)]) else: - X_tr, X_te, y_tr, y_te = train_test_split( - X, y, test_size=args.test_frac, random_state=42, shuffle=True - ) - model.fit(X_tr, y_tr) - y_pred = model.predict(X_te) - print_metrics(y_te, y_pred) - - # ----- coefficients ---------------------------------------------------- - weights = dict(zip(FEATURES, model.coef_)) - bias = float(model.intercept_) - - print("\n# --- coefficients ---") - for k, v in weights.items(): - print(f"{k:14s} = {v}") - print(f"bias = {bias:.6e}") + X_aug = X + y_aug = y + + res = lsq_linear(X_aug, y_aug, bounds=(0, np.inf), method="trf", lsmr_tol="auto") + if not res.success: + raise RuntimeError(f"linear solver failed: {res.message}") + return res.x + + +def _group_base(X_lin: np.ndarray, w: np.ndarray, idxs: list[int]) -> np.ndarray: + return X_lin[:, idxs] @ w[idxs] + + +def fit_piecewise_distill_multigroup( + cpu_base: np.ndarray, + mem_base: np.ndarray, + disk_base: np.ndarray, + net_base: np.ndarray, + cpu_bins: np.ndarray, + mem_bins: np.ndarray, + disk_bins: np.ndarray, + net_bins: np.ndarray, + target: np.ndarray, +) -> tuple[float, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + n = len(target) + cols = 1 + 4 * NL_LUT_BINS + A = np.zeros((n, cols), dtype=np.float64) + A[:, 0] = 1.0 + + for i in range(NL_LUT_BINS): + A[cpu_bins == i, 1 + i] = cpu_base[cpu_bins == i] + A[mem_bins == i, 1 + NL_LUT_BINS + i] = mem_base[mem_bins == i] + A[disk_bins == i, 1 + 2 * NL_LUT_BINS + i] = disk_base[disk_bins == i] + A[net_bins == i, 1 + 3 * NL_LUT_BINS + i] = net_base[net_bins == i] + + res = lsq_linear(A, target, bounds=(0, np.inf), method="trf", lsmr_tol="auto") + if not res.success: + raise RuntimeError(f"distill solver failed: {res.message}") + + x = res.x + idle = float(x[0]) + cpu_mul = x[1:1 + NL_LUT_BINS] + mem_mul = x[1 + NL_LUT_BINS:1 + 2 * NL_LUT_BINS] + disk_mul = x[1 + 2 * NL_LUT_BINS:1 + 3 * NL_LUT_BINS] + net_mul = x[1 + 3 * NL_LUT_BINS:1 + 4 * NL_LUT_BINS] + return idle, cpu_mul, mem_mul, disk_mul, net_mul + + +def fill_sparse_bins(mults: np.ndarray, bins: np.ndarray, min_samples: int = 20) -> np.ndarray: + counts = np.bincount(bins, minlength=NL_LUT_BINS) + filled = mults.copy() + for i in range(NL_LUT_BINS): + if counts[i] >= min_samples and filled[i] > 0: + continue + left = i - 1 + right = i + 1 + repl = None + while left >= 0 or right < NL_LUT_BINS: + if left >= 0 and counts[left] >= min_samples and filled[left] > 0: + repl = filled[left] + break + if right < NL_LUT_BINS and counts[right] >= min_samples and filled[right] > 0: + repl = filled[right] + break + left -= 1 + right += 1 + filled[i] = repl if repl is not None else 1.0 + return filled + + +def piecewise_predict( + idle_uj: float, + cpu_base: np.ndarray, + mem_base: np.ndarray, + disk_base: np.ndarray, + net_base: np.ndarray, + cpu_bins: np.ndarray, + mem_bins: np.ndarray, + disk_bins: np.ndarray, + net_bins: np.ndarray, + cpu_mul: np.ndarray, + mem_mul: np.ndarray, + disk_mul: np.ndarray, + net_mul: np.ndarray, +) -> np.ndarray: + return ( + idle_uj + + cpu_base * cpu_mul[cpu_bins] + + mem_base * mem_mul[mem_bins] + + disk_base * disk_mul[disk_bins] + + net_base * net_mul[net_bins] + ) + + +def metric_bundle(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float]: + rmse = float(np.sqrt(mean_squared_error(y_true, y_pred))) + mae = float(mean_absolute_error(y_true, y_pred)) + r2 = float(r2_score(y_true, y_pred)) + mask = np.abs(y_true) > 1e-9 + mape = float(np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100.0) if np.any(mask) else float("nan") + return {"r2": r2, "rmse": rmse, "mae": mae, "mape_pct": mape} + + +def print_metrics(label: str, metrics: dict[str, float]) -> None: + print(f"\n# --- {label} ---") + print(f"R2 : {metrics['r2']:.5f}") + print(f"RMSE : {metrics['rmse']:.3f} uJ") + print(f"MAE : {metrics['mae']:.3f} uJ") + print(f"MAPE : {metrics['mape_pct']:.3f} %" if np.isfinite(metrics["mape_pct"]) else "MAPE : n/a") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Train nonlinear PSYS model and distill to kernel-friendly linear+multi-LUT params." + ) + parser.add_argument("logfiles", nargs="+", type=Path) + parser.add_argument("--mode", choices=("delta",), default="delta") + parser.add_argument("--alpha", type=float, default=10.0, help="L2 strength for non-negative linear fit") + parser.add_argument("--test-frac", type=float, default=0.2) + parser.add_argument("--min-target-uj", type=float, default=1.0) + parser.add_argument("--trim-upper-quantile", type=float, default=0.999) + parser.add_argument("--random-seed", type=int, default=42) + args = parser.parse_args() + + for path in args.logfiles: + if not path.exists(): + raise FileNotFoundError(path) + + df = gather_rows(args.logfiles, args.mode, args.min_target_uj) + if df.empty or len(df) < 40: + raise RuntimeError("Not enough usable rows (need >= 40 after filtering)") + + train_df, test_df = split_time(df, args.test_frac) + train_df = trim_train(train_df, args.trim_upper_quantile) + if train_df.empty or test_df.empty: + raise RuntimeError("Train/test split produced an empty set") + + X_train_lin = train_df[FEATURE_COLS].to_numpy(dtype=np.float64) + y_train = train_df["target_uj"].to_numpy(dtype=np.float64) + X_test_lin = test_df[FEATURE_COLS].to_numpy(dtype=np.float64) + y_test = test_df["target_uj"].to_numpy(dtype=np.float64) + + w = fit_nonnegative_ridge(X_train_lin, y_train, args.alpha) + pred_lin_train = X_train_lin @ w + pred_lin_test = X_test_lin @ w + + baseline_vec = np.array([BASELINE_WEIGHTS[k] for k in FEATURE_COLS], dtype=np.float64) + pred_baseline_test = X_test_lin @ baseline_vec + + X_train_nl = np.column_stack([ + X_train_lin, + train_df["cpu_util_permille"].to_numpy(dtype=np.float64), + train_df["mem_activity"].to_numpy(dtype=np.float64), + train_df["disk_activity"].to_numpy(dtype=np.float64), + train_df["net_activity"].to_numpy(dtype=np.float64), + ]) + X_test_nl = np.column_stack([ + X_test_lin, + test_df["cpu_util_permille"].to_numpy(dtype=np.float64), + test_df["mem_activity"].to_numpy(dtype=np.float64), + test_df["disk_activity"].to_numpy(dtype=np.float64), + test_df["net_activity"].to_numpy(dtype=np.float64), + ]) + + nl_model = HistGradientBoostingRegressor( + loss="squared_error", + learning_rate=0.05, + max_iter=600, + max_depth=6, + min_samples_leaf=20, + l2_regularization=1.0, + random_state=args.random_seed, + ) + nl_model.fit(X_train_nl, y_train) + pred_nl_train = np.clip(nl_model.predict(X_train_nl), a_min=0.0, a_max=None) + pred_nl_test = np.clip(nl_model.predict(X_test_nl), a_min=0.0, a_max=None) + + cpu_base_train = _group_base(X_train_lin, w, IDX["cpu"]) + mem_base_train = _group_base(X_train_lin, w, IDX["mem"]) + disk_base_train = _group_base(X_train_lin, w, IDX["disk"]) + net_base_train = _group_base(X_train_lin, w, IDX["net"]) + + cpu_base_test = _group_base(X_test_lin, w, IDX["cpu"]) + mem_base_test = _group_base(X_test_lin, w, IDX["mem"]) + disk_base_test = _group_base(X_test_lin, w, IDX["disk"]) + net_base_test = _group_base(X_test_lin, w, IDX["net"]) + + cpu_bins_train = train_df["cpu_bin"].to_numpy(dtype=int) + mem_bins_train = train_df["mem_bin"].to_numpy(dtype=int) + disk_bins_train = train_df["disk_bin"].to_numpy(dtype=int) + net_bins_train = train_df["net_bin"].to_numpy(dtype=int) + + cpu_bins_test = test_df["cpu_bin"].to_numpy(dtype=int) + mem_bins_test = test_df["mem_bin"].to_numpy(dtype=int) + disk_bins_test = test_df["disk_bin"].to_numpy(dtype=int) + net_bins_test = test_df["net_bin"].to_numpy(dtype=int) + + idle_uj, cpu_mul, mem_mul, disk_mul, net_mul = fit_piecewise_distill_multigroup( + cpu_base=cpu_base_train, + mem_base=mem_base_train, + disk_base=disk_base_train, + net_base=net_base_train, + cpu_bins=cpu_bins_train, + mem_bins=mem_bins_train, + disk_bins=disk_bins_train, + net_bins=net_bins_train, + target=pred_nl_train, + ) + + cpu_mul = fill_sparse_bins(cpu_mul, cpu_bins_train) + mem_mul = fill_sparse_bins(mem_mul, mem_bins_train) + disk_mul = fill_sparse_bins(disk_mul, disk_bins_train) + net_mul = fill_sparse_bins(net_mul, net_bins_train) + + pred_distill_train = piecewise_predict( + idle_uj, cpu_base_train, mem_base_train, disk_base_train, net_base_train, + cpu_bins_train, mem_bins_train, disk_bins_train, net_bins_train, + cpu_mul, mem_mul, disk_mul, net_mul, + ) + pred_distill_test = piecewise_predict( + idle_uj, cpu_base_test, mem_base_test, disk_base_test, net_base_test, + cpu_bins_test, mem_bins_test, disk_bins_test, net_bins_test, + cpu_mul, mem_mul, disk_mul, net_mul, + ) + + print(f"# mode={args.mode} alpha={args.alpha}") + print(f"# rows_total={len(df)} rows_train={len(train_df)} rows_test={len(test_df)}") + + print_metrics("test metrics (current module defaults)", metric_bundle(y_test, pred_baseline_test)) + print_metrics("test metrics (linear non-negative ridge)", metric_bundle(y_test, pred_lin_test)) + print_metrics("test metrics (nonlinear reference)", metric_bundle(y_test, pred_nl_test)) + print_metrics("test metrics (distilled kernel model)", metric_bundle(y_test, pred_distill_test)) + print_metrics("train metrics (distilled kernel model)", metric_bundle(y_train, pred_distill_train)) + + int_weights: dict[str, int] = {} + print("\n# --- fitted linear weights (float) ---") + for spec, value in zip(FEATURES, w): + print(f"{spec.param_name:20s} = {value:.6f}") + int_weights[spec.param_name] = max(0, int(np.rint(value))) + + w_sys_idle_uj = max(0, int(np.rint(idle_uj))) + nl_cpu_lut_pmil = np.clip(np.rint(cpu_mul * 1000.0), 0, 1000000).astype(int) + nl_mem_lut_pmil = np.clip(np.rint(mem_mul * 1000.0), 0, 1000000).astype(int) + nl_disk_lut_pmil = np.clip(np.rint(disk_mul * 1000.0), 0, 1000000).astype(int) + nl_net_lut_pmil = np.clip(np.rint(net_mul * 1000.0), 0, 1000000).astype(int) + + print("\n# --- distilled nonlinear params ---") + print(f"w_sys_idle_uj = {w_sys_idle_uj}") + print("nl_cpu_lut_pmil = " + ",".join(str(v) for v in nl_cpu_lut_pmil.tolist())) + print("nl_mem_lut_pmil = " + ",".join(str(v) for v in nl_mem_lut_pmil.tolist())) + print("nl_disk_lut_pmil = " + ",".join(str(v) for v in nl_disk_lut_pmil.tolist())) + print("nl_net_lut_pmil = " + ",".join(str(v) for v in nl_net_lut_pmil.tolist())) + + print("\n# --- module params (rounded ints) ---") + for spec in FEATURES: + print(f"{spec.param_name:20s} = {int_weights[spec.param_name]}") + + params = " ".join(f"{k}={v}" for k, v in int_weights.items()) + cpu_lut = ",".join(str(v) for v in nl_cpu_lut_pmil.tolist()) + mem_lut = ",".join(str(v) for v in nl_mem_lut_pmil.tolist()) + disk_lut = ",".join(str(v) for v in nl_disk_lut_pmil.tolist()) + net_lut = ",".join(str(v) for v in nl_net_lut_pmil.tolist()) + + print("\n# reload example") + print( + "sudo insmod energy_proc.ko " + f"{params} w_sys_idle_uj={w_sys_idle_uj} " + f"nl_cpu_lut_pmil={cpu_lut} " + f"nl_mem_lut_pmil={mem_lut} " + f"nl_disk_lut_pmil={disk_lut} " + f"nl_net_lut_pmil={net_lut}" + ) + + print("\n# runtime update example") + for key, value in int_weights.items(): + print(f"echo {value} | sudo tee /sys/module/energy_proc/parameters/{key}") + print(f"echo {w_sys_idle_uj} | sudo tee /sys/module/energy_proc/parameters/w_sys_idle_uj") + print(f"echo {cpu_lut} | sudo tee /sys/module/energy_proc/parameters/nl_cpu_lut_pmil") + print(f"echo {mem_lut} | sudo tee /sys/module/energy_proc/parameters/nl_mem_lut_pmil") + print(f"echo {disk_lut} | sudo tee /sys/module/energy_proc/parameters/nl_disk_lut_pmil") + print(f"echo {net_lut} | sudo tee /sys/module/energy_proc/parameters/nl_net_lut_pmil") + if __name__ == "__main__": main() From 2c22c0206e1bbdfe84b520d7b4441915ac9c73e1 Mon Sep 17 00:00:00 2001 From: Didi Hoffmann Date: Fri, 13 Feb 2026 11:24:57 +0100 Subject: [PATCH 3/3] Adds the benchmark and smaller fixes --- README.md | 2 + tools/benchmark.sh | 194 +++++++++++++++++++++++++++++++++++++++++ tools/model.py | 50 +++++++++-- tools/requirements.txt | 20 ++--- 4 files changed, 247 insertions(+), 19 deletions(-) create mode 100755 tools/benchmark.sh diff --git a/README.md b/README.md index b456545..88f8220 100644 --- a/README.md +++ b/README.md @@ -215,6 +215,8 @@ echo 1000,1000,1000,1000,1000,1000,1000,1000 | sudo tee /sys/module/energy_proc/ Tips: - Use multiple logs from different workloads in one training run for better generalization. - Include CPU-, memory-, disk-, and network-heavy workloads in your training set so all LUTs are identifiable. +- `--trim-upper-quantile` now trims both train and test using the train quantile cutoff (helps ignore extreme outliers). +- The linear fit is two-stage (CPU/mem first, then disk+net on residuals) to avoid disk/net weights collapsing to zero. ## Test diff --git a/tools/benchmark.sh b/tools/benchmark.sh new file mode 100755 index 0000000..65f4268 --- /dev/null +++ b/tools/benchmark.sh @@ -0,0 +1,194 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'USAGE' +Usage: ./benchmark.sh [options] + +Runs a mixed workload to exercise CPU, memory, disk, network, and wakeups +while you collect energy logs. Run energy-logger.sh in another terminal. + +Options: + --duration SEC seconds per phase (default: 20) + --rounds N repeat all phases N times (default: 1) + --tmpdir DIR directory for disk payloads (default: /tmp/procpower-bench) + --file-mb MB size of disk IO file in MB (default: 512) + --mem-mb MB memory stress size in MB (default: 512) + --cpu-workers N CPU worker processes (default: nproc) + --wakeup-threads N wakeup threads (default: nproc) + --net-url URL download URL for network phase + -h, --help show this help + +Notes: + - For real disk IO, set --tmpdir to a disk-backed path (not tmpfs). + - Network phase downloads from the internet; ensure you have connectivity. +USAGE +} + +have() { command -v "$1" >/dev/null 2>&1; } + +DURATION=20 +ROUNDS=1 +TMPDIR="/tmp/procpower-bench" +FILE_MB=512 +MEM_MB=512 +CPU_WORKERS="$(nproc)" +WAKEUP_THREADS="$(nproc)" +NET_URL="https://speed.hetzner.de/100MB.bin" + +while [[ $# -gt 0 ]]; do + case "$1" in + --duration) DURATION="$2"; shift 2 ;; + --rounds) ROUNDS="$2"; shift 2 ;; + --tmpdir) TMPDIR="$2"; shift 2 ;; + --file-mb) FILE_MB="$2"; shift 2 ;; + --mem-mb) MEM_MB="$2"; shift 2 ;; + --cpu-workers) CPU_WORKERS="$2"; shift 2 ;; + --wakeup-threads) WAKEUP_THREADS="$2"; shift 2 ;; + --net-url) NET_URL="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "Unknown arg: $1" >&2; usage; exit 1 ;; + esac +done + +if ! have timeout; then + echo "timeout not found; install coreutils." >&2 + exit 1 +fi + +mkdir -p "$TMPDIR" +IO_FILE="$TMPDIR/bench-io.bin" + +cleanup() { + rm -f "$IO_FILE" +} +trap cleanup EXIT + +phase() { + echo + echo "== $1 ==" +} + +run_timeout() { + # timeout exits 124 on expected time limit; don't treat as failure + timeout "$@" || { + status=$? + if [[ $status -ne 124 ]]; then + return $status + fi + } +} + +cpu_phase() { + phase "cpu" + if have stress-ng; then + stress-ng --cpu "$CPU_WORKERS" --cpu-method matrixprod --timeout "${DURATION}s" --metrics-brief + return + fi + DURATION="$DURATION" CPU_WORKERS="$CPU_WORKERS" python3 - <<'PY' +import math, os, time +import multiprocessing as mp + +duration = float(os.environ["DURATION"]) +workers = int(os.environ["CPU_WORKERS"]) + +def burn(): + x = 0.0001 + end = time.time() + duration + while time.time() < end: + x = math.sin(x) * math.cos(x) + 1.000001 + +procs = [mp.Process(target=burn) for _ in range(workers)] +for p in procs: p.start() +for p in procs: p.join() +PY +} + +mem_phase() { + phase "memory" + if have stress-ng; then + stress-ng --vm 1 --vm-bytes "${MEM_MB}M" --timeout "${DURATION}s" --metrics-brief + return + fi + DURATION="$DURATION" MEM_MB="$MEM_MB" python3 - <<'PY' +import os, time + +duration = float(os.environ["DURATION"]) +size = int(os.environ["MEM_MB"]) * 1024 * 1024 +buf = bytearray(size) +end = time.time() + duration +step = 4096 +while time.time() < end: + for i in range(0, len(buf), step): + buf[i] = (buf[i] + 1) & 0xFF +PY +} + +disk_write_phase() { + phase "disk write" + local count=$((FILE_MB / 4)) + if (( count < 1 )); then count=1; fi + run_timeout "${DURATION}s" bash -c "while :; do dd if=/dev/zero of='$IO_FILE' bs=4M count=$count conv=fdatasync status=none; done" +} + +disk_read_phase() { + phase "disk read" + if [[ ! -f "$IO_FILE" ]]; then + echo "disk file missing; skipping read phase" + return + fi + local count=$((FILE_MB / 4)) + if (( count < 1 )); then count=1; fi + run_timeout "${DURATION}s" bash -c "while :; do dd if='$IO_FILE' of=/dev/null bs=4M count=$count status=none; done" +} + +net_phase() { + phase "network (remote download)" + if have curl; then + run_timeout "${DURATION}s" bash -c "while :; do curl -sSfL --output /dev/null '${NET_URL}'; done" + return + fi + if have wget; then + run_timeout "${DURATION}s" bash -c "while :; do wget -q -O /dev/null '${NET_URL}'; done" + return + fi + echo "curl/wget not found; skipping network phase" +} + +wakeups_phase() { + phase "wakeups" + if have stress-ng; then + if stress-ng --switch "$WAKEUP_THREADS" --timeout "${DURATION}s" --metrics-brief; then + return + fi + echo "stress-ng wakeup stressor failed; falling back to Python" + fi + DURATION="$DURATION" WAKEUP_THREADS="$WAKEUP_THREADS" python3 - <<'PY' +import os, time, threading + +duration = float(os.environ["DURATION"]) +threads = int(os.environ["WAKEUP_THREADS"]) +end = time.time() + duration + +def worker(): + while time.time() < end: + time.sleep(0.001) + +ts = [threading.Thread(target=worker) for _ in range(threads)] +for t in ts: t.start() +for t in ts: t.join() +PY +} + +for ((i=1; i<=ROUNDS; i++)); do + echo "Round $i/$ROUNDS" + cpu_phase + mem_phase + disk_write_phase + disk_read_phase + net_phase + wakeups_phase +done + +echo +echo "Benchmark complete." diff --git a/tools/model.py b/tools/model.py index 056fb51..8c91452 100755 --- a/tools/model.py +++ b/tools/model.py @@ -223,19 +223,30 @@ def gather_rows(paths: Iterable[Path], mode: str, min_target_uj: float) -> pd.Da return pd.concat(rows, ignore_index=True) -def split_time(df: pd.DataFrame, test_frac: float) -> tuple[pd.DataFrame, pd.DataFrame]: +def split_random(df: pd.DataFrame, test_frac: float, seed: int) -> tuple[pd.DataFrame, pd.DataFrame]: if not 0 < test_frac < 1: raise ValueError("test-frac must be in (0, 1)") + rng = np.random.default_rng(seed) + idx = rng.permutation(len(df)) cut = max(1, int(len(df) * (1.0 - test_frac))) cut = min(cut, len(df) - 1) - return df.iloc[:cut].copy(), df.iloc[cut:].copy() + train_idx = idx[:cut] + test_idx = idx[cut:] + return df.iloc[train_idx].copy(), df.iloc[test_idx].copy() -def trim_train(df: pd.DataFrame, quantile: float) -> pd.DataFrame: +def trim_by_quantile( + train_df: pd.DataFrame, + test_df: pd.DataFrame, + quantile: float, +) -> tuple[pd.DataFrame, pd.DataFrame]: if quantile >= 1.0: - return df - q = df["target_uj"].quantile(quantile) - return df[df["target_uj"] <= q] + return train_df, test_df + q = train_df["target_uj"].quantile(quantile) + return ( + train_df[train_df["target_uj"] <= q], + test_df[test_df["target_uj"] <= q], + ) def fit_nonnegative_ridge(X: np.ndarray, y: np.ndarray, alpha: float) -> np.ndarray: @@ -256,6 +267,25 @@ def fit_nonnegative_ridge(X: np.ndarray, y: np.ndarray, alpha: float) -> np.ndar return res.x +def fit_two_stage_nonnegative_ridge( + X: np.ndarray, + y: np.ndarray, + alpha: float, + stage1_idx: list[int], + stage2_idx: list[int], +) -> np.ndarray: + w = np.zeros(X.shape[1], dtype=np.float64) + X1 = X[:, stage1_idx] + w1 = fit_nonnegative_ridge(X1, y, alpha) + w[stage1_idx] = w1 + residual = y - X1 @ w1 + + X2 = X[:, stage2_idx] + w2 = fit_nonnegative_ridge(X2, residual, alpha) + w[stage2_idx] = w2 + return w + + def _group_base(X_lin: np.ndarray, w: np.ndarray, idxs: list[int]) -> np.ndarray: return X_lin[:, idxs] @ w[idxs] @@ -379,8 +409,8 @@ def main() -> None: if df.empty or len(df) < 40: raise RuntimeError("Not enough usable rows (need >= 40 after filtering)") - train_df, test_df = split_time(df, args.test_frac) - train_df = trim_train(train_df, args.trim_upper_quantile) + train_df, test_df = split_random(df, args.test_frac, args.random_seed) + train_df, test_df = trim_by_quantile(train_df, test_df, args.trim_upper_quantile) if train_df.empty or test_df.empty: raise RuntimeError("Train/test split produced an empty set") @@ -389,7 +419,9 @@ def main() -> None: X_test_lin = test_df[FEATURE_COLS].to_numpy(dtype=np.float64) y_test = test_df["target_uj"].to_numpy(dtype=np.float64) - w = fit_nonnegative_ridge(X_train_lin, y_train, args.alpha) + stage1_idx = sorted(IDX["cpu"] + IDX["mem"]) + stage2_idx = sorted(IDX["disk"] + IDX["net"]) + w = fit_two_stage_nonnegative_ridge(X_train_lin, y_train, args.alpha, stage1_idx, stage2_idx) pred_lin_train = X_train_lin @ w pred_lin_test = X_test_lin @ w diff --git a/tools/requirements.txt b/tools/requirements.txt index 68c496a..71a3cb9 100644 --- a/tools/requirements.txt +++ b/tools/requirements.txt @@ -1,10 +1,10 @@ -joblib==1.5.1 -numpy==2.3.2 -pandas==2.3.1 -python-dateutil==2.9.0.post0 -pytz==2025.2 -scikit-learn==1.7.1 -scipy==1.16.1 -six==1.17.0 -threadpoolctl==3.6.0 -tzdata==2025.2 +joblib +numpy +pandas +python-dateutil +pytz +scikit-learn +scipy +six +threadpoolctl +tzdata