From 9a17ec876ffb617e6cf8559ac87120f1f3e45c08 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Wed, 18 Feb 2026 06:58:42 +0100 Subject: [PATCH 1/4] deleting a location cascade-deletes gateways --- crates/defguard_core/src/handlers/wireguard.rs | 6 ------ ...20260218054705_[2.0.0]_gateway_cascade_delete.down.sql | 7 +++++++ .../20260218054705_[2.0.0]_gateway_cascade_delete.up.sql | 8 ++++++++ 3 files changed, 15 insertions(+), 6 deletions(-) create mode 100644 migrations/20260218054705_[2.0.0]_gateway_cascade_delete.down.sql create mode 100644 migrations/20260218054705_[2.0.0]_gateway_cascade_delete.up.sql diff --git a/crates/defguard_core/src/handlers/wireguard.rs b/crates/defguard_core/src/handlers/wireguard.rs index 286d50fa56..26dbbe7d79 100644 --- a/crates/defguard_core/src/handlers/wireguard.rs +++ b/crates/defguard_core/src/handlers/wireguard.rs @@ -456,12 +456,6 @@ pub(crate) async fn delete_network( let network = find_network(network_id, &appstate.pool).await?; let network_name = network.name.clone(); let mut transaction = appstate.pool.begin().await?; - let network_devices = network - .get_devices_by_type(&mut *transaction, DeviceType::Network) - .await?; - for device in network_devices { - device.delete(&mut *transaction).await?; - } network.clone().delete(&mut *transaction).await?; transaction.commit().await?; appstate.send_wireguard_event(GatewayEvent::NetworkDeleted(network_id, network_name)); diff --git a/migrations/20260218054705_[2.0.0]_gateway_cascade_delete.down.sql b/migrations/20260218054705_[2.0.0]_gateway_cascade_delete.down.sql new file mode 100644 index 0000000000..d438bcd26b --- /dev/null +++ b/migrations/20260218054705_[2.0.0]_gateway_cascade_delete.down.sql @@ -0,0 +1,7 @@ +ALTER TABLE gateway +DROP CONSTRAINT gateway_network_id_fkey; + +ALTER TABLE gateway +ADD CONSTRAINT gateway_network_id_fkey +FOREIGN KEY (network_id) +REFERENCES wireguard_network(id); diff --git a/migrations/20260218054705_[2.0.0]_gateway_cascade_delete.up.sql b/migrations/20260218054705_[2.0.0]_gateway_cascade_delete.up.sql new file mode 100644 index 0000000000..bf0b321e3b --- /dev/null +++ b/migrations/20260218054705_[2.0.0]_gateway_cascade_delete.up.sql @@ -0,0 +1,8 @@ +ALTER TABLE gateway +DROP CONSTRAINT gateway_network_id_fkey; + +ALTER TABLE gateway +ADD CONSTRAINT gateway_network_id_fkey +FOREIGN KEY (network_id) +REFERENCES wireguard_network(id) +ON DELETE CASCADE; From 7dae5e1de766d79d806e028a309a3f1e60ca6f23 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Wed, 18 Feb 2026 08:13:59 +0100 Subject: [PATCH 2/4] keep stats in GatewayManager, refactor run_handler closure --- crates/defguard/src/main.rs | 8 +- crates/defguard_gateway_manager/src/lib.rs | 103 ++++++++++++--------- 2 files changed, 62 insertions(+), 49 deletions(-) diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index b38f497619..c37871db5a 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -180,16 +180,12 @@ async fn main() -> Result<(), anyhow::Error> { proxy_control_rx, ); - let mut gateway_manager = GatewayManager::default(); + let mut gateway_manager = GatewayManager::new(pool.clone(), gateway_tx.clone(), peer_stats_tx); // run services tokio::select! { res = proxy_manager.run() => error!("ProxyManager returned early: {res:?}"), - res = gateway_manager.run( - pool.clone(), - gateway_tx.clone(), - peer_stats_tx, - ) => error!("Gateway gRPC stream returned early: {res:?}"), + res = gateway_manager.run() => error!("GatewayManager returned early: {res:?}"), res = run_grpc_server( Arc::clone(&worker_state), pool.clone(), diff --git a/crates/defguard_gateway_manager/src/lib.rs b/crates/defguard_gateway_manager/src/lib.rs index b100560eb1..508f7bc8e9 100644 --- a/crates/defguard_gateway_manager/src/lib.rs +++ b/crates/defguard_gateway_manager/src/lib.rs @@ -15,12 +15,12 @@ use defguard_proto::gateway::gateway_client::GatewayClient; use defguard_version::client::ClientVersionInterceptor; use sqlx::{PgPool, postgres::PgListener}; use tokio::{ - sync::{broadcast::Sender, mpsc::UnboundedSender}, + sync::{broadcast::Sender, mpsc::UnboundedSender, watch::Receiver}, task::{AbortHandle, JoinSet}, }; use tonic::{Request, service::interceptor::InterceptedService, transport::Channel}; -use crate::handler::GatewayHandler; +use crate::{error::GatewayError, handler::GatewayHandler}; #[macro_use] extern crate tracing; @@ -39,22 +39,35 @@ const TEN_SECS: Duration = Duration::from_secs(10); type Client = GatewayClient>; -#[derive(Default)] pub struct GatewayManager { clients: Arc>>, + pool: PgPool, + handlers: JoinSet>, + // TODO(jck) GatewayTxSet + events_tx: Sender, + peer_stats_tx: UnboundedSender, } impl GatewayManager { - /// Bi-directional gRPC stream for communication with Defguard Gateway. - pub async fn run( - &mut self, + pub fn new( pool: PgPool, events_tx: Sender, peer_stats_tx: UnboundedSender, - ) -> Result<(), anyhow::Error> { + ) -> Self { + Self { + clients: Arc::default(), + handlers: JoinSet::new(), + pool, + events_tx, + peer_stats_tx, + } + } + + /// Bi-directional gRPC stream for communication with Defguard Gateway. + pub async fn run(&mut self) -> Result<(), anyhow::Error> { let (certs_tx, certs_rx) = tokio::sync::watch::channel(Arc::new(HashMap::new())); - certs::refresh_certs(&pool, &certs_tx).await; - let refresh_pool = pool.clone(); + certs::refresh_certs(&self.pool, &certs_tx).await; + let refresh_pool = self.pool.clone(); tokio::spawn(async move { loop { certs::refresh_certs(&refresh_pool, &certs_tx).await; @@ -62,41 +75,15 @@ impl GatewayManager { } }); let mut abort_handles = HashMap::new(); - - let mut tasks = JoinSet::new(); - // Helper closure to launch `GatewayHandler`. - // TODO: Store arguments in GatewayManager and rewrite this to method - let mut launch_gateway_handler = |gateway: Gateway, - clients: Arc>>| - -> Result { - let mut gateway_handler = GatewayHandler::new( - gateway, - pool.clone(), - events_tx.clone(), - peer_stats_tx.clone(), - certs_rx.clone(), - )?; - let abort_handle = tasks.spawn(async move { - loop { - if let Err(err) = gateway_handler - .handle_connection(Arc::clone(&clients)) - .await - { - error!("Gateway connection error: {err}, retrying in 5 seconds..."); - tokio::time::sleep(GATEWAY_RECONNECT_DELAY).await; - } - } - }); - Ok(abort_handle) - }; - for gateway in Gateway::all(&pool).await? { + for gateway in Gateway::all(&self.pool).await? { let id = gateway.id; - let abort_handle = launch_gateway_handler(gateway, Arc::clone(&self.clients))?; + let abort_handle = + self.run_handler(gateway, Arc::clone(&self.clients), certs_rx.clone())?; abort_handles.insert(id, abort_handle); } // Observe gateway URL changes. - let mut listener = PgListener::connect_with(&pool).await?; + let mut listener = PgListener::connect_with(&self.pool).await?; listener.listen(GATEWAY_TABLE_TRIGGER).await?; while let Ok(notification) = listener.recv().await { let payload = notification.payload(); @@ -106,7 +93,7 @@ impl GatewayManager { if let Some(new) = gateway_notification.new { let id = new.id; let abort_handle = - launch_gateway_handler(new, Arc::clone(&self.clients))?; + self.run_handler(new, Arc::clone(&self.clients), certs_rx.clone())?; abort_handles.insert(id, abort_handle); } } @@ -124,8 +111,11 @@ impl GatewayManager { ); abort_handle.abort(); let id = new.id; - let abort_handle = - launch_gateway_handler(new, Arc::clone(&self.clients))?; + let abort_handle = self.run_handler( + new, + Arc::clone(&self.clients), + certs_rx.clone(), + )?; abort_handles.insert(id, abort_handle); } else { warn!("Cannot find {old} on the list of connected gateways"); @@ -173,10 +163,37 @@ impl GatewayManager { } } - while let Some(Ok(_result)) = tasks.join_next().await { + while let Some(Ok(_result)) = self.handlers.join_next().await { debug!("Gateway gRPC task has ended"); } Ok(()) } + + fn run_handler( + &mut self, + gateway: Gateway, + clients: Arc>>, + certs_rx: Receiver>>, + ) -> Result { + let mut gateway_handler = GatewayHandler::new( + gateway, + self.pool.clone(), + self.events_tx.clone(), + self.peer_stats_tx.clone(), + certs_rx.clone(), + )?; + let abort_handle = self.handlers.spawn(async move { + loop { + if let Err(err) = gateway_handler + .handle_connection(Arc::clone(&clients)) + .await + { + error!("Gateway connection error: {err}, retrying in 5 seconds..."); + tokio::time::sleep(GATEWAY_RECONNECT_DELAY).await; + } + } + }); + Ok(abort_handle) + } } From 408e55be33135453bac5b415a3aaa7869bbbc322 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Wed, 18 Feb 2026 08:22:20 +0100 Subject: [PATCH 3/4] GatewayTxSet --- crates/defguard/src/main.rs | 10 +++--- crates/defguard_gateway_manager/src/lib.rs | 36 ++++++++++++++-------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index c37871db5a..b4287f01d0 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -30,7 +30,7 @@ use defguard_core::{ }; use defguard_event_logger::{message::EventLoggerMessage, run_event_logger}; use defguard_event_router::{RouterReceiverSet, run_event_router}; -use defguard_gateway_manager::GatewayManager; +use defguard_gateway_manager::{GatewayManager, GatewayTxSet}; use defguard_proxy_manager::{ProxyManager, ProxyTxSet}; use defguard_session_manager::{events::SessionManagerEvent, run_session_manager}; use defguard_setup::setup::run_setup_web_server; @@ -172,15 +172,17 @@ async fn main() -> Result<(), anyhow::Error> { } let (proxy_control_tx, proxy_control_rx) = channel::(100); - let proxy_tx = ProxyTxSet::new(gateway_tx.clone(), bidi_event_tx.clone()); let proxy_manager = ProxyManager::new( pool.clone(), - proxy_tx, + ProxyTxSet::new(gateway_tx.clone(), bidi_event_tx.clone()), Arc::clone(&incompatible_components), proxy_control_rx, ); - let mut gateway_manager = GatewayManager::new(pool.clone(), gateway_tx.clone(), peer_stats_tx); + let mut gateway_manager = GatewayManager::new( + pool.clone(), + GatewayTxSet::new(gateway_tx.clone(), peer_stats_tx), + ); // run services tokio::select! { diff --git a/crates/defguard_gateway_manager/src/lib.rs b/crates/defguard_gateway_manager/src/lib.rs index 508f7bc8e9..5cd8755aef 100644 --- a/crates/defguard_gateway_manager/src/lib.rs +++ b/crates/defguard_gateway_manager/src/lib.rs @@ -43,23 +43,17 @@ pub struct GatewayManager { clients: Arc>>, pool: PgPool, handlers: JoinSet>, - // TODO(jck) GatewayTxSet - events_tx: Sender, - peer_stats_tx: UnboundedSender, + tx: GatewayTxSet, } impl GatewayManager { - pub fn new( - pool: PgPool, - events_tx: Sender, - peer_stats_tx: UnboundedSender, - ) -> Self { + #[must_use] + pub fn new(pool: PgPool, tx: GatewayTxSet) -> Self { Self { clients: Arc::default(), handlers: JoinSet::new(), pool, - events_tx, - peer_stats_tx, + tx, } } @@ -179,8 +173,8 @@ impl GatewayManager { let mut gateway_handler = GatewayHandler::new( gateway, self.pool.clone(), - self.events_tx.clone(), - self.peer_stats_tx.clone(), + self.tx.events.clone(), + self.tx.peer_stats.clone(), certs_rx.clone(), )?; let abort_handle = self.handlers.spawn(async move { @@ -197,3 +191,21 @@ impl GatewayManager { Ok(abort_handle) } } + +/// Shared set of outbound channels that gateway instances use to forward +/// events, notifications, and side effects to Core components. +#[derive(Clone)] +pub struct GatewayTxSet { + events: Sender, + peer_stats: UnboundedSender, +} + +impl GatewayTxSet { + #[must_use] + pub const fn new( + events: Sender, + peer_stats: UnboundedSender, + ) -> Self { + Self { events, peer_stats } + } +} From af6c3e36d9f9547c27eb8a4cefe1f17075b3dc9c Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Wed, 18 Feb 2026 08:46:25 +0100 Subject: [PATCH 4/4] revert device removal changes --- crates/defguard_core/src/handlers/wireguard.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/defguard_core/src/handlers/wireguard.rs b/crates/defguard_core/src/handlers/wireguard.rs index 26dbbe7d79..286d50fa56 100644 --- a/crates/defguard_core/src/handlers/wireguard.rs +++ b/crates/defguard_core/src/handlers/wireguard.rs @@ -456,6 +456,12 @@ pub(crate) async fn delete_network( let network = find_network(network_id, &appstate.pool).await?; let network_name = network.name.clone(); let mut transaction = appstate.pool.begin().await?; + let network_devices = network + .get_devices_by_type(&mut *transaction, DeviceType::Network) + .await?; + for device in network_devices { + device.delete(&mut *transaction).await?; + } network.clone().delete(&mut *transaction).await?; transaction.commit().await?; appstate.send_wireguard_event(GatewayEvent::NetworkDeleted(network_id, network_name));