From 652f3eb560689f727a6ef903a92416bfe6c6d799 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Mon, 26 Jan 2026 13:26:51 +0100 Subject: [PATCH 1/8] setup a basic process of storing per-gateway stats --- .../src/session_state.rs | 58 +++++++++++++------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/crates/defguard_session_manager/src/session_state.rs b/crates/defguard_session_manager/src/session_state.rs index c11815c5cf..4c8113f5c8 100644 --- a/crates/defguard_session_manager/src/session_state.rs +++ b/crates/defguard_session_manager/src/session_state.rs @@ -20,6 +20,21 @@ use crate::{ events::{SessionManagerEvent, SessionManagerEventContext, SessionManagerEventType}, }; +/// Helper map to store latest stats update for each gateway in a given location +pub(crate) struct LastGatewayUpdate(HashMap); + +impl LastGatewayUpdate { + fn new() -> Self { + Self(HashMap::new()) + } + + /// Store latest stats for a given gateway + fn update(&mut self, session_stats: VpnSessionStats) { + let gateway_id = session_stats.gateway_id; + unimplemented!() + } +} + struct LastStatsUpdate { collected_at: NaiveDateTime, latest_handshake: NaiveDateTime, @@ -66,37 +81,42 @@ impl From> for LastStatsUpdate { /// State of a specific VPN client session pub(crate) struct SessionState { session_id: Id, - last_stats_update: Option, + last_stats_update: LastGatewayUpdate, } impl SessionState { fn new(session_id: Id) -> Self { Self { session_id, - last_stats_update: None, + last_stats_update: LastGatewayUpdate::new(), } } + fn try_get_last_stats_update<'a>(&self, gateway_id: Id) -> Option<&'a LastStatsUpdate> { + unimplemented!() + } + /// Updates session stats based on received peer update pub(crate) async fn update_stats( &mut self, transaction: &mut PgConnection, peer_stats_update: PeerStatsUpdate, ) -> Result<(), SessionManagerError> { - // get previous stats if available and calculate transfer change - let (upload_diff, download_diff) = match &self.last_stats_update { - Some(last_stats_update) => { - // validate current update against latest value - last_stats_update.validate_update(&peer_stats_update)?; - - // calculate transfer change - ( - peer_stats_update.upload as i64 - last_stats_update.total_upload, - peer_stats_update.download as i64 - last_stats_update.total_download, - ) - } - None => (0, 0), - }; + // get previous stats for a given gateway if available and calculate transfer change + let (upload_diff, download_diff) = + match self.try_get_last_stats_update(peer_stats_update.gateway_id) { + Some(last_stats_update) => { + // validate current update against latest value + last_stats_update.validate_update(&peer_stats_update)?; + + // calculate transfer change + ( + peer_stats_update.upload as i64 - last_stats_update.total_upload, + peer_stats_update.download as i64 - last_stats_update.total_download, + ) + } + None => (0, 0), + }; let vpn_session_stats = VpnSessionStats::new( self.session_id, @@ -114,7 +134,7 @@ impl SessionState { let stats = vpn_session_stats.save(transaction).await?; // update latest stats - self.last_stats_update = Some(LastStatsUpdate::from(stats)); + self.last_stats_update.update(stats); Ok(()) } @@ -124,7 +144,7 @@ impl From<&VpnClientSession> for SessionState { fn from(value: &VpnClientSession) -> Self { Self { session_id: value.id, - last_stats_update: None, + last_stats_update: LastGatewayUpdate::new(), } } } @@ -197,7 +217,7 @@ impl ActiveSessionsMap { // try to fetch latest available stats for a given session if let Some(latest_stats) = db_session.try_get_latest_stats(transaction).await? { - session_state.last_stats_update = Some(LastStatsUpdate::from(latest_stats)); + session_state.last_stats_update.update(latest_stats); }; // put session state in map From 3143324f30cf9c7f33891b83b2abc2085f7058f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Mon, 26 Jan 2026 17:03:46 +0100 Subject: [PATCH 2/8] add note for later --- crates/defguard_session_manager/src/session_state.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/defguard_session_manager/src/session_state.rs b/crates/defguard_session_manager/src/session_state.rs index 4c8113f5c8..553dac06be 100644 --- a/crates/defguard_session_manager/src/session_state.rs +++ b/crates/defguard_session_manager/src/session_state.rs @@ -216,6 +216,7 @@ impl ActiveSessionsMap { let mut session_state = SessionState::from(&db_session); // try to fetch latest available stats for a given session + // FIXME: fetch latest stats for each gateway if let Some(latest_stats) = db_session.try_get_latest_stats(transaction).await? { session_state.last_stats_update.update(latest_stats); }; From 821ad488b470c8d8b8f990747115a96bc7b4b8c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Mon, 26 Jan 2026 17:46:28 +0100 Subject: [PATCH 3/8] handle gateway stats updates --- crates/defguard_session_manager/src/session_state.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/defguard_session_manager/src/session_state.rs b/crates/defguard_session_manager/src/session_state.rs index 553dac06be..a816724cac 100644 --- a/crates/defguard_session_manager/src/session_state.rs +++ b/crates/defguard_session_manager/src/session_state.rs @@ -29,12 +29,18 @@ impl LastGatewayUpdate { } /// Store latest stats for a given gateway + /// + /// We assume that at this point the update has already been validated. fn update(&mut self, session_stats: VpnSessionStats) { let gateway_id = session_stats.gateway_id; - unimplemented!() + let latest_stats = LastStatsUpdate::from(session_stats); + + debug!("Replacing latest stats update for gateway {gateway_id} with {latest_stats:?}"); + let _maybe_previous = self.0.insert(gateway_id, latest_stats); } } +#[derive(Debug)] struct LastStatsUpdate { collected_at: NaiveDateTime, latest_handshake: NaiveDateTime, @@ -92,8 +98,8 @@ impl SessionState { } } - fn try_get_last_stats_update<'a>(&self, gateway_id: Id) -> Option<&'a LastStatsUpdate> { - unimplemented!() + fn try_get_last_stats_update<'a>(&'a self, gateway_id: Id) -> Option<&'a LastStatsUpdate> { + self.last_stats_update.0.get(&gateway_id) } /// Updates session stats based on received peer update From 1d735592e4f8bb4ca6df13aa7d90bcf5d29000d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Mon, 26 Jan 2026 18:03:09 +0100 Subject: [PATCH 4/8] fetch latest stats for each gateway --- .../src/db/models/vpn_client_session.rs | 11 ++++++----- .../defguard_session_manager/src/session_state.rs | 15 +++++++++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/crates/defguard_common/src/db/models/vpn_client_session.rs b/crates/defguard_common/src/db/models/vpn_client_session.rs index 3ca957d93b..c5ad74ead2 100644 --- a/crates/defguard_common/src/db/models/vpn_client_session.rs +++ b/crates/defguard_common/src/db/models/vpn_client_session.rs @@ -85,20 +85,21 @@ impl VpnClientSession { .await } - pub async fn try_get_latest_stats<'e, E: sqlx::PgExecutor<'e>>( + /// Returns latest stats in a given session for each gateway + pub async fn get_latest_stats_for_all_gateways<'e, E: sqlx::PgExecutor<'e>>( &self, executor: E, - ) -> Result>, SqlxError> { + ) -> Result>, SqlxError> { query_as!( VpnSessionStats, - "SELECT id, session_id, gateway_id, collected_at, latest_handshake, endpoint, \ + "SELECT DISTINCT ON (gateway_id) id, session_id, gateway_id, collected_at, latest_handshake, endpoint, \ total_upload, total_download, upload_diff, download_diff FROM vpn_session_stats \ WHERE session_id = $1 \ - ORDER BY collected_at DESC LIMIT 1", + ORDER BY gateway_id, collected_at DESC", self.id ) - .fetch_optional(executor) + .fetch_all(executor) .await } diff --git a/crates/defguard_session_manager/src/session_state.rs b/crates/defguard_session_manager/src/session_state.rs index a816724cac..80a0305e7a 100644 --- a/crates/defguard_session_manager/src/session_state.rs +++ b/crates/defguard_session_manager/src/session_state.rs @@ -98,7 +98,7 @@ impl SessionState { } } - fn try_get_last_stats_update<'a>(&'a self, gateway_id: Id) -> Option<&'a LastStatsUpdate> { + fn try_get_last_stats_update(&self, gateway_id: Id) -> Option<&LastStatsUpdate> { self.last_stats_update.0.get(&gateway_id) } @@ -221,14 +221,17 @@ impl ActiveSessionsMap { Some(db_session) => { let mut session_state = SessionState::from(&db_session); - // try to fetch latest available stats for a given session - // FIXME: fetch latest stats for each gateway - if let Some(latest_stats) = db_session.try_get_latest_stats(transaction).await? { - session_state.last_stats_update.update(latest_stats); - }; + // fetch latest available stats for each gateway for a given session + let latest_gateway_stats = db_session + .get_latest_stats_for_all_gateways(transaction) + .await?; + for stats in latest_gateway_stats { + session_state.last_stats_update.update(stats); + } // put session state in map let maybe_existing_session = session_map.insert(device_id, session_state); + // if a session exists already there was an error in earlier logic assert!(maybe_existing_session.is_none()); From 7f35e6a799faf80eb2313a9422613f948704611b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Mon, 26 Jan 2026 18:09:28 +0100 Subject: [PATCH 5/8] update query data --- ...268487293a74bb8fd9393571a3f98b1e323f566493e464bbf1e3.json} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename .sqlx/{query-c154aea1df6c3f273a1e7ab9c9a1c5b4da1599de659cd8315e331d6caf203fa4.json => query-b2196d15ed73268487293a74bb8fd9393571a3f98b1e323f566493e464bbf1e3.json} (78%) diff --git a/.sqlx/query-c154aea1df6c3f273a1e7ab9c9a1c5b4da1599de659cd8315e331d6caf203fa4.json b/.sqlx/query-b2196d15ed73268487293a74bb8fd9393571a3f98b1e323f566493e464bbf1e3.json similarity index 78% rename from .sqlx/query-c154aea1df6c3f273a1e7ab9c9a1c5b4da1599de659cd8315e331d6caf203fa4.json rename to .sqlx/query-b2196d15ed73268487293a74bb8fd9393571a3f98b1e323f566493e464bbf1e3.json index e5b2104a6a..eb0d67ae11 100644 --- a/.sqlx/query-c154aea1df6c3f273a1e7ab9c9a1c5b4da1599de659cd8315e331d6caf203fa4.json +++ b/.sqlx/query-b2196d15ed73268487293a74bb8fd9393571a3f98b1e323f566493e464bbf1e3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, session_id, gateway_id, collected_at, latest_handshake, endpoint, total_upload, total_download, upload_diff, download_diff\n \tFROM vpn_session_stats WHERE session_id = $1 ORDER BY collected_at DESC LIMIT 1", + "query": "SELECT DISTINCT ON (gateway_id) id, session_id, gateway_id, collected_at, latest_handshake, endpoint, total_upload, total_download, upload_diff, download_diff\n \tFROM vpn_session_stats WHERE session_id = $1 ORDER BY gateway_id, collected_at DESC", "describe": { "columns": [ { @@ -72,5 +72,5 @@ false ] }, - "hash": "c154aea1df6c3f273a1e7ab9c9a1c5b4da1599de659cd8315e331d6caf203fa4" + "hash": "b2196d15ed73268487293a74bb8fd9393571a3f98b1e323f566493e464bbf1e3" } From d074d3fb1bedb4f3331b25cdd11870f57fa98405 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Tue, 27 Jan 2026 13:24:09 +0100 Subject: [PATCH 6/8] fix active sessions query --- crates/defguard_common/src/db/models/vpn_client_session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/defguard_common/src/db/models/vpn_client_session.rs b/crates/defguard_common/src/db/models/vpn_client_session.rs index c5ad74ead2..8050f4ecc7 100644 --- a/crates/defguard_common/src/db/models/vpn_client_session.rs +++ b/crates/defguard_common/src/db/models/vpn_client_session.rs @@ -77,7 +77,7 @@ impl VpnClientSession { "SELECT id, location_id, user_id, device_id, created_at, connected_at, disconnected_at, \ mfa_mode \"mfa_mode: LocationMfaMode\", state \"state: VpnClientSessionState\" \ FROM vpn_client_session \ - WHERE location_id = $1 AND device_id = $2", + WHERE location_id = $1 AND device_id = $2 AND state IN ('new', 'connected')", location_id, device_id ) From d3c320c85e24d45d82213c6eaec29d90983a0c61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Tue, 27 Jan 2026 13:25:25 +0100 Subject: [PATCH 7/8] remove old todo --- crates/defguard_session_manager/src/session_state.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/defguard_session_manager/src/session_state.rs b/crates/defguard_session_manager/src/session_state.rs index 80a0305e7a..422a830d75 100644 --- a/crates/defguard_session_manager/src/session_state.rs +++ b/crates/defguard_session_manager/src/session_state.rs @@ -169,7 +169,6 @@ impl SessionMap { } } -// TODO(mwojcik): handle multiple gateways per location /// Helper struct to hold session maps for all locations and object cache to avoid repeated DB queries /// /// Since we want to support HA core deployments this structure From 46eb8f155c7e4c8152035c153d9c88cc62ba01fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Tue, 27 Jan 2026 13:25:59 +0100 Subject: [PATCH 8/8] update query data --- ...739f95c056ef871f0ed200f2b4c10707685ece61e1dbe85c5c37.json} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename .sqlx/{query-83722331508d9f6347db04c44546ddc6c1c82aad42f16dbda45003f13a1f6e33.json => query-98739c1b3049739f95c056ef871f0ed200f2b4c10707685ece61e1dbe85c5c37.json} (91%) diff --git a/.sqlx/query-83722331508d9f6347db04c44546ddc6c1c82aad42f16dbda45003f13a1f6e33.json b/.sqlx/query-98739c1b3049739f95c056ef871f0ed200f2b4c10707685ece61e1dbe85c5c37.json similarity index 91% rename from .sqlx/query-83722331508d9f6347db04c44546ddc6c1c82aad42f16dbda45003f13a1f6e33.json rename to .sqlx/query-98739c1b3049739f95c056ef871f0ed200f2b4c10707685ece61e1dbe85c5c37.json index 08ecdf129a..043b020675 100644 --- a/.sqlx/query-83722331508d9f6347db04c44546ddc6c1c82aad42f16dbda45003f13a1f6e33.json +++ b/.sqlx/query-98739c1b3049739f95c056ef871f0ed200f2b4c10707685ece61e1dbe85c5c37.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, location_id, user_id, device_id, created_at, connected_at, disconnected_at, mfa_mode \"mfa_mode: LocationMfaMode\", state \"state: VpnClientSessionState\" FROM vpn_client_session WHERE location_id = $1 AND device_id = $2", + "query": "SELECT id, location_id, user_id, device_id, created_at, connected_at, disconnected_at, mfa_mode \"mfa_mode: LocationMfaMode\", state \"state: VpnClientSessionState\" FROM vpn_client_session WHERE location_id = $1 AND device_id = $2 AND state IN ('new', 'connected')", "describe": { "columns": [ { @@ -89,5 +89,5 @@ false ] }, - "hash": "83722331508d9f6347db04c44546ddc6c1c82aad42f16dbda45003f13a1f6e33" + "hash": "98739c1b3049739f95c056ef871f0ed200f2b4c10707685ece61e1dbe85c5c37" }