diff --git a/.sqlx/query-2016043d0b61918b44b219291c8662decd88fbb21085b1f792803b2eb353e47b.json b/.sqlx/query-2016043d0b61918b44b219291c8662decd88fbb21085b1f792803b2eb353e47b.json new file mode 100644 index 0000000000..9c7703efa3 --- /dev/null +++ b/.sqlx/query-2016043d0b61918b44b219291c8662decd88fbb21085b1f792803b2eb353e47b.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM vpn_client_session WHERE created_at < $1 AND (device_id, location_id, created_at) NOT IN ( SELECT device_id, location_id, MAX(created_at) FROM vpn_client_session GROUP BY device_id, location_id)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamp" + ] + }, + "nullable": [] + }, + "hash": "2016043d0b61918b44b219291c8662decd88fbb21085b1f792803b2eb353e47b" +} diff --git a/.sqlx/query-4218fd109bd4a17b2a4551cfe0d14f2c9b6a37f25f0696d83078dae2c9f87c5f.json b/.sqlx/query-4218fd109bd4a17b2a4551cfe0d14f2c9b6a37f25f0696d83078dae2c9f87c5f.json deleted file mode 100644 index 5354828a78..0000000000 --- a/.sqlx/query-4218fd109bd4a17b2a4551cfe0d14f2c9b6a37f25f0696d83078dae2c9f87c5f.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM wireguard_peer_stats WHERE collected_at < $1 AND (device_id, network, collected_at) NOT IN ( SELECT device_id, network, MAX(collected_at) FROM wireguard_peer_stats GROUP BY device_id, network)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Timestamp" - ] - }, - "nullable": [] - }, - "hash": "4218fd109bd4a17b2a4551cfe0d14f2c9b6a37f25f0696d83078dae2c9f87c5f" -} diff --git a/Cargo.lock b/Cargo.lock index c19bab3a57..cdccff64e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -596,9 +596,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.54" +version = "1.2.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" +checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" dependencies = [ "find-msvc-tools", "jobserver", @@ -1135,6 +1135,7 @@ dependencies = [ "defguard_proxy_manager", "defguard_session_manager", "defguard_version", + "defguard_vpn_stats_purge", "dotenvy", "secrecy", "tokio", @@ -1402,6 +1403,18 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "defguard_vpn_stats_purge" +version = "0.0.0" +dependencies = [ + "chrono", + "defguard_common", + "humantime", + "sqlx", + "tokio", + "tracing", +] + [[package]] name = "defguard_web_ui" version = "0.0.0" @@ -1821,9 +1834,9 @@ checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" [[package]] name = "find-msvc-tools" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "fixedbitset" @@ -4639,9 +4652,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54e910108742c57a770f492731f99be216a52fadd361b06c8fb59d74ccc267d2" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" dependencies = [ "dyn-clone", "ref-cast", @@ -4867,7 +4880,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.13.0", "schemars 0.9.0", - "schemars 1.2.0", + "schemars 1.2.1", "serde_core", "serde_json", "serde_with_macros", @@ -5037,9 +5050,9 @@ checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" [[package]] name = "slab" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "slug" @@ -6958,18 +6971,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.36" +version = "0.8.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dafd85c832c1b68bbb4ec0c72c7f6f4fc5179627d2bc7c26b30e4c0cc11e76cc" +checksum = "7456cf00f0685ad319c5b1693f291a650eaf345e941d082fc4e03df8a03996ac" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.36" +version = "0.8.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cb7e4e8436d9db52fbd6625dbf2f45243ab84994a72882ec8227b99e72b439a" +checksum = "1328722bbf2115db7e19d69ebcc15e795719e2d66b60827c6a69a117365e37a0" dependencies = [ "proc-macro2", "quote", @@ -7072,9 +7085,9 @@ checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3" [[package]] name = "zmij" -version = "1.0.17" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" +checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" [[package]] name = "zopfli" diff --git a/Cargo.toml b/Cargo.toml index e2efd0769c..676a1baf3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ defguard_proto = { path = "./crates/defguard_proto", version = "0.0.0" } defguard_proxy_manager = { path = "./crates/defguard_proxy_manager", version = "0.0.0" } defguard_session_manager = { path = "./crates/defguard_session_manager", version = "0.0.0" } defguard_version = { path = "./crates/defguard_version", version = "0.0.0" } +defguard_vpn_stats_purge = { path = "./crates/defguard_vpn_stats_purge", version = "0.0.0" } defguard_web_ui = { path = "./crates/defguard_web_ui", version = "0.0.0" } defguard_certs = { path = "./crates/defguard_certs", version = "0.0.0" } model_derive = { path = "./crates/model_derive", version = "0.0.0" } diff --git a/crates/defguard/Cargo.toml b/crates/defguard/Cargo.toml index 777ec89353..b9ded3c80f 100644 --- a/crates/defguard/Cargo.toml +++ b/crates/defguard/Cargo.toml @@ -17,6 +17,7 @@ defguard_mail = { workspace = true } defguard_proxy_manager = { workspace = true } defguard_session_manager = { workspace = true } defguard_version = { workspace = true } +defguard_vpn_stats_purge = { workspace = true } defguard_certs = { workspace = true } # external dependencies diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index 7c9a0fc7cc..3f6c1dbef5 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -34,13 +34,13 @@ use defguard_core::{ init_dev_env, init_vpn_location, run_web_server, utility_thread::run_utility_thread, version::IncompatibleComponents, - wireguard_stats_purge::run_periodic_stats_purge, }; use defguard_event_logger::{message::EventLoggerMessage, run_event_logger}; use defguard_event_router::{RouterReceiverSet, run_event_router}; use defguard_mail::{Mail, run_mail_handler}; use defguard_proxy_manager::{ProxyManager, ProxyTxSet}; use defguard_session_manager::{events::SessionManagerEvent, run_session_manager}; +use defguard_vpn_stats_purge::run_periodic_stats_purge; use secrecy::ExposeSecret; use tokio::sync::{ broadcast, diff --git a/crates/defguard_common/src/db/models/wireguard_peer_stats.rs b/crates/defguard_common/src/db/models/wireguard_peer_stats.rs index 099f89229f..e6d964ac68 100644 --- a/crates/defguard_common/src/db/models/wireguard_peer_stats.rs +++ b/crates/defguard_common/src/db/models/wireguard_peer_stats.rs @@ -1,12 +1,8 @@ -use std::time::Duration; - -use chrono::{DateTime, NaiveDateTime, TimeDelta, Utc}; -use humantime::format_duration; +use chrono::NaiveDateTime; use ipnetwork::IpNetwork; use model_derive::Model; use serde::{Deserialize, Serialize}; -use sqlx::{PgExecutor, PgPool, query, query_as, query_scalar}; -use tracing::{debug, info}; +use sqlx::{PgPool, query_as}; use crate::db::{Id, NoId}; @@ -28,90 +24,6 @@ pub struct WireguardPeerStats { pub allowed_ips: Option, } -impl WireguardPeerStats { - /// Delete stats older than a configured threshold. - /// This is done to prevent unnecessary table growth. - /// At least one record is retained for each device and network combination, - /// even when older than set threshold. - pub async fn purge_old_stats( - pool: &PgPool, - stats_purge_threshold: Duration, - ) -> Result<(), sqlx::Error> { - let start = Utc::now(); - info!( - "Purging stats older than {}", - format_duration(stats_purge_threshold) - ); - - let threshold = (Utc::now() - - TimeDelta::from_std(stats_purge_threshold).expect("Failed to parse duration")) - .naive_utc(); - let result = query!( - "DELETE FROM wireguard_peer_stats \ - WHERE collected_at < $1 \ - AND (device_id, network, collected_at) NOT IN ( \ - SELECT device_id, network, MAX(collected_at) \ - FROM wireguard_peer_stats \ - GROUP BY device_id, network)", - threshold - ) - .execute(pool) - .await?; - - let end = Utc::now(); - let rows_count = result.rows_affected(); - - info!("Removed {rows_count} old records from wireguard_peer_stats",); - - // Store successful stats purge in database. - Self::record_stats_purge(pool, start, end, threshold, rows_count as i64).await?; - - Ok(()) - } - - // Check how much time has elapsed since last recorded stats purge - pub async fn time_since_last_purge<'e, E>(executor: E) -> Result, sqlx::Error> - where - E: PgExecutor<'e>, - { - debug!("Checking time since last stats purge"); - - let timestamp = query_scalar!("SELECT MAX(started_at) FROM wireguard_stats_purge") - .fetch_one(executor) - .await?; - - match timestamp { - Some(timestamp) => { - let time_since = Utc::now().signed_duration_since(timestamp.and_utc()); - let time_since = time_since.to_std().expect("Failed to parse duration"); - debug!( - "Time since last stats purge: {}", - format_duration(time_since) - ); - Ok(Some(time_since)) - } - None => Ok(None), - } - } - - async fn record_stats_purge<'e, E>( - executor: E, - start: DateTime, - end: DateTime, - removal_threshold: NaiveDateTime, - records_removed: i64, - ) -> Result<(), sqlx::Error> - where - E: PgExecutor<'e>, - { - debug!("Recording successful stats purge in database"); - query!("INSERT INTO wireguard_stats_purge (started_at, finished_at, removal_threshold, records_removed) VALUES ($1, $2, $3, $4)", - start.naive_utc(), end.naive_utc(), removal_threshold, records_removed).execute(executor).await?; - - Ok(()) - } -} - impl WireguardPeerStats { pub async fn fetch_latest( conn: &PgPool, @@ -167,6 +79,8 @@ impl WireguardPeerStats { #[cfg(test)] mod test { + use chrono::Utc; + use super::*; #[test] diff --git a/crates/defguard_core/src/lib.rs b/crates/defguard_core/src/lib.rs index deadada189..4bcdf7d2a7 100644 --- a/crates/defguard_core/src/lib.rs +++ b/crates/defguard_core/src/lib.rs @@ -178,7 +178,6 @@ pub mod user_management; pub mod utility_thread; pub mod version; pub mod wg_config; -pub mod wireguard_stats_purge; #[macro_use] extern crate tracing; diff --git a/crates/defguard_core/src/wireguard_stats_purge.rs b/crates/defguard_core/src/wireguard_stats_purge.rs deleted file mode 100644 index ff36e1da16..0000000000 --- a/crates/defguard_core/src/wireguard_stats_purge.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::time::Duration; - -use chrono::{TimeDelta, Utc}; -use defguard_common::db::models::wireguard_peer_stats::WireguardPeerStats; -use humantime::format_duration; -use sqlx::PgPool; -use tokio::time::sleep; - -// How long to sleep between loop iterations -const PURGE_LOOP_SLEEP: Duration = Duration::from_secs(300); // 5 minutes - -#[instrument(skip_all)] -pub async fn run_periodic_stats_purge( - pool: PgPool, - stats_purge_frequency: Duration, - stats_purge_threshold: Duration, -) -> Result<(), sqlx::Error> { - info!( - "Starting periodic purge of stats older than {} every {}", - format_duration(stats_purge_threshold), - format_duration(stats_purge_frequency) - ); - - loop { - debug!("Checking if stats purge should be executed"); - // check time elapsed since last purge - let time_since_last_purge = WireguardPeerStats::time_since_last_purge(&pool).await?; - if match time_since_last_purge { - Some(time_since) => time_since >= stats_purge_frequency, - None => true, - } { - // perform purge - info!("Executing stats purge"); - match WireguardPeerStats::purge_old_stats(&pool, stats_purge_threshold).await { - Ok(()) => { - let next_purge_timestamp = (Utc::now() - + TimeDelta::from_std(stats_purge_frequency) - .expect("Failed to parse duration")) - .naive_utc(); - info!( - "Stats purge successful. Next purge will be executed at {next_purge_timestamp}" - ); - } - Err(err) => { - error!("Error while purging stats: {err}"); - } - } - } - - // wait till next iteration - debug!("Sleeping until next iteration"); - sleep(PURGE_LOOP_SLEEP).await; - } -} diff --git a/crates/defguard_vpn_stats_purge/Cargo.toml b/crates/defguard_vpn_stats_purge/Cargo.toml new file mode 100644 index 0000000000..620de32a49 --- /dev/null +++ b/crates/defguard_vpn_stats_purge/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "defguard_vpn_stats_purge" +version = "0.0.0" +edition.workspace = true +license-file.workspace = true +homepage.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +defguard_common.workspace = true + +chrono.workspace = true +humantime.workspace = true +sqlx.workspace = true +tokio.workspace = true +tracing.workspace = true + diff --git a/crates/defguard_vpn_stats_purge/src/lib.rs b/crates/defguard_vpn_stats_purge/src/lib.rs new file mode 100644 index 0000000000..cd4af73485 --- /dev/null +++ b/crates/defguard_vpn_stats_purge/src/lib.rs @@ -0,0 +1,137 @@ +use std::time::Duration; + +use chrono::{DateTime, NaiveDateTime, TimeDelta, Utc}; +use humantime::format_duration; +use sqlx::{PgExecutor, PgPool, query, query_scalar}; +use tokio::time::sleep; +use tracing::{debug, error, info, instrument}; + +// How long to sleep between loop iterations +const PURGE_LOOP_SLEEP: Duration = Duration::from_secs(300); // 5 minutes + +#[instrument(skip_all)] +pub async fn run_periodic_stats_purge( + pool: PgPool, + stats_purge_frequency: Duration, + stats_purge_threshold: Duration, +) -> Result<(), sqlx::Error> { + info!( + "Starting periodic purge of VPN sessions and related stats older than {} every {}", + format_duration(stats_purge_threshold), + format_duration(stats_purge_frequency) + ); + + loop { + debug!("Checking if stats purge should be executed"); + // check time elapsed since last purge + let time_since_last_purge = time_since_last_purge(&pool).await?; + if match time_since_last_purge { + Some(time_since) => time_since >= stats_purge_frequency, + None => true, + } { + // perform purge + info!("Executing VPN session stats purge"); + match purge_old_sessions(&pool, stats_purge_threshold).await { + Ok(()) => { + let next_purge_timestamp = (Utc::now() + + TimeDelta::from_std(stats_purge_frequency) + .expect("Failed to parse duration")) + .naive_utc(); + info!( + "VPN session stats purge successful. Next purge will be executed at {next_purge_timestamp}" + ); + } + Err(err) => { + error!("Error while purging VPN session stats: {err}"); + } + } + } + + // wait till next iteration + debug!("Sleeping until next iteration"); + sleep(PURGE_LOOP_SLEEP).await; + } +} + +// Check how much time has elapsed since last recorded stats purge +async fn time_since_last_purge<'e, E>(executor: E) -> Result, sqlx::Error> +where + E: PgExecutor<'e>, +{ + debug!("Checking time since last VPN session stats purge"); + + let timestamp = query_scalar!("SELECT MAX(started_at) FROM wireguard_stats_purge") + .fetch_one(executor) + .await?; + + match timestamp { + Some(timestamp) => { + let time_since = Utc::now().signed_duration_since(timestamp.and_utc()); + let time_since = time_since.to_std().expect("Failed to parse duration"); + debug!( + "Time since last stats purge: {}", + format_duration(time_since) + ); + Ok(Some(time_since)) + } + None => Ok(None), + } +} + +/// Delete VPN sessions and related stats older than a configured threshold. +/// This is done to prevent unnecessary table growth. +/// At least one session is retained for each device and location combination, +/// even when older than set threshold to generate last connection info for VPN overview. +async fn purge_old_sessions( + pool: &PgPool, + stats_purge_threshold: Duration, +) -> Result<(), sqlx::Error> { + let start = Utc::now(); + info!( + "Purging VPN sessions older than {}", + format_duration(stats_purge_threshold) + ); + + let threshold = (Utc::now() + - TimeDelta::from_std(stats_purge_threshold).expect("Failed to parse duration")) + .naive_utc(); + + let result = query!( + "DELETE FROM vpn_client_session \ + WHERE created_at < $1 \ + AND (device_id, location_id, created_at) NOT IN ( \ + SELECT device_id, location_id, MAX(created_at) \ + FROM vpn_client_session \ + GROUP BY device_id, location_id)", + threshold + ) + .execute(pool) + .await?; + + let end = Utc::now(); + let rows_count = result.rows_affected(); + + info!("Removed {rows_count} old records from wireguard_peer_stats",); + + // Store successful stats purge in database. + record_stats_purge(pool, start, end, threshold, rows_count as i64).await?; + + Ok(()) +} + +async fn record_stats_purge<'e, E>( + executor: E, + start: DateTime, + end: DateTime, + removal_threshold: NaiveDateTime, + records_removed: i64, +) -> Result<(), sqlx::Error> +where + E: PgExecutor<'e>, +{ + debug!("Recording successful VPN session stats purge in database"); + query!("INSERT INTO wireguard_stats_purge (started_at, finished_at, removal_threshold, records_removed) VALUES ($1, $2, $3, $4)", + start.naive_utc(), end.naive_utc(), removal_threshold, records_removed).execute(executor).await?; + + Ok(()) +} diff --git a/deny.toml b/deny.toml index 6cee259570..46fbff37c3 100644 --- a/deny.toml +++ b/deny.toml @@ -165,6 +165,10 @@ exceptions = [ "AGPL-3.0-only", "AGPL-3.0-or-later", ], crate = "defguard_generator" }, + { allow = [ + "AGPL-3.0-only", + "AGPL-3.0-or-later", + ], crate = "defguard_vpn_stats_purge" }, ] # Some crates don't have (easily) machine readable licensing information, diff --git a/flake.lock b/flake.lock index 96f53666ca..b3064ed172 100644 --- a/flake.lock +++ b/flake.lock @@ -32,11 +32,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1769170682, - "narHash": "sha256-oMmN1lVQU0F0W2k6OI3bgdzp2YOHWYUAw79qzDSjenU=", + "lastModified": 1769789167, + "narHash": "sha256-kKB3bqYJU5nzYeIROI82Ef9VtTbu4uA3YydSk/Bioa8=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "c5296fdd05cfa2c187990dd909864da9658df755", + "rev": "62c8382960464ceb98ea593cb8321a2cf8f9e3e5", "type": "github" }, "original": { @@ -74,11 +74,11 @@ ] }, "locked": { - "lastModified": 1769396217, - "narHash": "sha256-YNzh46h8fby49yOIB40lNoQ9ucVoXe1bHVwkZ4AwGe0=", + "lastModified": 1770001842, + "narHash": "sha256-ZAyTeILfdWwDp1nuF0RK3McBduMi49qnJvrS+3Ezpac=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "e9bcd12156a577ac4e47d131c14dc0293cc9c8c2", + "rev": "5018343419ea808f8a413241381976b7e60951f2", "type": "github" }, "original": {