From e3af328d34856a0ac6df9f0054a3bf747127a7b1 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Sat, 24 Jan 2026 09:06:36 +0100 Subject: [PATCH 01/14] remove ProxyRouter --- crates/defguard_proxy_manager/src/lib.rs | 95 ++---------------------- 1 file changed, 6 insertions(+), 89 deletions(-) diff --git a/crates/defguard_proxy_manager/src/lib.rs b/crates/defguard_proxy_manager/src/lib.rs index 6057dea7be..3bf8cd6afd 100644 --- a/crates/defguard_proxy_manager/src/lib.rs +++ b/crates/defguard_proxy_manager/src/lib.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, str::FromStr, sync::{Arc, RwLock}, time::Duration, @@ -117,60 +116,6 @@ pub enum ProxyError { ConnectionTimeout(String), } -/// Maintains routing state for proxy-specific responses by associating -/// correlation tokens with the proxy senders that should receive them. -#[derive(Default)] -struct ProxyRouter { - response_map: HashMap>>, -} - -impl ProxyRouter { - /// Records the proxy sender associated with a request that expects a routed response. - pub(crate) fn register_request( - &mut self, - request: &CoreRequest, - sender: &UnboundedSender, - ) { - match &request.payload { - // Mobile-assisted MFA completion responses must go to the proxy that owns the WebSocket - // so it can send the preshared key. - // Corresponds to the `core_response::Payload::ClientMfaFinish(response)` response. - // https://github.com/DefGuard/defguard/issues/1700 - Some(core_request::Payload::ClientMfaTokenValidation(request)) => { - self.response_map - .insert(request.token.clone(), vec![sender.clone()]); - } - Some(core_request::Payload::ClientMfaFinish(request)) => { - if let Some(senders) = self.response_map.get_mut(&request.token) { - senders.push(sender.clone()); - } - } - _ => {} - } - } - - /// Determines whether the given `CoreResponse` must be routed to a specific proxy instance. - pub(crate) fn route_response( - &mut self, - response: &CoreResponse, - ) -> Option>> { - #[allow(clippy::single_match)] - match &response.payload { - // Mobile-assisted MFA completion responses must go to the proxy that owns the WebSocket - // so it can send the preshared key. - // Corresponds to the `core_request::Payload::ClientMfaTokenValidation(request)` request. - // https://github.com/DefGuard/defguard/issues/1700 - Some(core_response::Payload::ClientMfaFinish(response)) => { - if let Some(ref token) = response.token { - return self.response_map.remove(token); - } - } - _ => {} - } - None - } -} - /// Coordinates communication between the Core and multiple proxy instances. /// /// Responsibilities include: @@ -181,7 +126,6 @@ pub struct ProxyManager { pool: PgPool, tx: ProxyTxSet, incompatible_components: Arc>, - router: Arc>, } impl ProxyManager { @@ -194,7 +138,6 @@ impl ProxyManager { pool, tx, incompatible_components, - router: Arc::default(), } } @@ -208,14 +151,7 @@ impl ProxyManager { let mut proxies: Vec = Proxy::all(&self.pool) .await? .iter() - .map(|proxy| { - ProxyServer::from_proxy( - proxy, - self.pool.clone(), - &self.tx, - Arc::clone(&self.router), - ) - }) + .map(|proxy| ProxyServer::from_proxy(proxy, self.pool.clone(), &self.tx)) .collect::>()?; debug!("Retrieved {} proxies from the DB", proxies.len()); @@ -223,8 +159,7 @@ impl ProxyManager { if let Some(ref url) = server_config().proxy_url { debug!("Adding proxy from cli arg: {url}"); let url = Url::from_str(url)?; - let proxy = - ProxyServer::new(self.pool.clone(), url, &self.tx, Arc::clone(&self.router)); + let proxy = ProxyServer::new(self.pool.clone(), url, &self.tx); proxies.push(proxy); } @@ -287,33 +222,25 @@ struct ProxyServer { pool: PgPool, /// gRPC servers services: ProxyServices, - /// Router shared between proxies and the proxy manager - router: Arc>, /// Proxy server gRPC URL url: Url, } impl ProxyServer { - pub fn new(pool: PgPool, url: Url, tx: &ProxyTxSet, router: Arc>) -> Self { + pub fn new(pool: PgPool, url: Url, tx: &ProxyTxSet) -> Self { // Instantiate gRPC servers. let services = ProxyServices::new(&pool, tx); Self { pool, services, - router, url, } } - fn from_proxy( - proxy: &Proxy, - pool: PgPool, - tx: &ProxyTxSet, - router: Arc>, - ) -> Result { + fn from_proxy(proxy: &Proxy, pool: PgPool, tx: &ProxyTxSet) -> Result { let url = Url::from_str(&format!("http://{}:{}", proxy.address, proxy.port))?; - Ok(Self::new(pool, url, tx, router)) + Ok(Self::new(pool, url, tx)) } fn endpoint(&self, scheme: Scheme) -> Result { @@ -524,10 +451,6 @@ impl ProxyServer { } Ok(Some(received)) => { debug!("Received message from proxy; ID={}", received.id); - self.router - .write() - .unwrap() - .register_request(&received, &tx); let payload = match received.payload { // rpc CodeMfaSetupStart return (CodeMfaSetupStartResponse) Some(core_request::Payload::CodeMfaSetupStart(request)) => { @@ -945,13 +868,7 @@ impl ProxyServer { id: received.id, payload, }; - if let Some(txs) = self.router.write().unwrap().route_response(&req) { - for tx in txs { - let _ = tx.send(req.clone()); - } - } else { - let _ = tx.send(req); - } + let _ = tx.send(req); } Err(err) => { error!("Disconnected from proxy at {}: {err}", self.url); From 1d2af2922f4877c330f26667b57986c7fa925bb1 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Sat, 24 Jan 2026 09:44:18 +0100 Subject: [PATCH 02/14] Handle ClientRemoteMfaFinish message --- .../src/grpc/proxy/client_mfa.rs | 49 +++++++++++++++- crates/defguard_proxy_manager/src/lib.rs | 56 ++++++++++++++++--- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index 6d3432af46..213ed87c59 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -1,4 +1,7 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; use chrono::Utc; use defguard_common::{ @@ -17,13 +20,14 @@ use defguard_mail::Mail; use defguard_proto::proxy::{ self, ClientMfaFinishRequest, ClientMfaFinishResponse, ClientMfaStartRequest, ClientMfaStartResponse, ClientMfaTokenValidationRequest, ClientMfaTokenValidationResponse, - MfaMethod, + ClientRemoteMfaFinishRequest, ClientRemoteMfaFinishResponse, MfaMethod, }; use sqlx::PgPool; use thiserror::Error; use tokio::sync::{ broadcast::Sender, mpsc::{UnboundedSender, error::SendError}, + oneshot, }; use tonic::{Code, Status}; @@ -63,6 +67,7 @@ pub struct ClientMfaServer { mail_tx: UnboundedSender, wireguard_tx: Sender, pub(crate) sessions: HashMap, + remote_mfa_responses: Arc>>>, bidi_event_tx: UnboundedSender, } @@ -73,12 +78,14 @@ impl ClientMfaServer { mail_tx: UnboundedSender, wireguard_tx: Sender, bidi_event_tx: UnboundedSender, + remote_mfa_responses: Arc>>>, ) -> Self { Self { pool, mail_tx, wireguard_tx, bidi_event_tx, + remote_mfa_responses, sessions: HashMap::new(), } } @@ -368,6 +375,30 @@ impl ClientMfaServer { Ok(()) } + #[instrument(skip_all)] + pub async fn finish_remote_client_mfa_login( + &mut self, + request: ClientRemoteMfaFinishRequest, + ) -> Result { + debug!("Finishing desktop client login: {request:?}"); + let (tx, rx) = oneshot::channel(); + self.remote_mfa_responses + .write() + .expect("Failed to write-lock ClientMfaServer::remote_mfa_responses") + .insert(request.token.clone(), tx); + + // TODO(jck) do we need timeout here? memory leaks possible? + let preshared_key = rx.await.map_err(|err| { + error!("Remote MFA responses channel failed: {err:?}"); + Status::internal("Remote MFA responses channel failed") + })?; + + Ok(ClientRemoteMfaFinishResponse { + token: request.token, + preshared_key, + }) + } + #[instrument(skip_all)] pub async fn finish_client_mfa_login( &mut self, @@ -616,7 +647,7 @@ impl ClientMfaServer { network_info: vec![DeviceNetworkInfo { network_id: location.id, device_wireguard_ips: network_device.wireguard_ips, - preshared_key: network_device.preshared_key, + preshared_key: network_device.preshared_key.clone(), is_authorized: network_device.is_authorized, }], }; @@ -660,6 +691,18 @@ impl ClientMfaServer { Status::internal("unexpected error") })?; + // If there is a desktop client websocket waiting for the preshared key, send it. + if let (Some(tx), Some(ref preshared_key)) = ( + self.remote_mfa_responses + .write() + .expect("Failed to write-lock ClientMfaServer::remote_mfa_responses") + .remove(&request.token), + network_device.preshared_key, + ) { + // TODO(jck) error handling + let _ = tx.send(preshared_key.clone()); + } + Ok(response) } } diff --git a/crates/defguard_proxy_manager/src/lib.rs b/crates/defguard_proxy_manager/src/lib.rs index 3bf8cd6afd..c28e0a86b7 100644 --- a/crates/defguard_proxy_manager/src/lib.rs +++ b/crates/defguard_proxy_manager/src/lib.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, str::FromStr, sync::{Arc, RwLock}, time::Duration, @@ -50,6 +51,7 @@ use tokio::{ sync::{ broadcast::Sender, mpsc::{self, UnboundedSender}, + oneshot, }, task::JoinSet, time::sleep, @@ -147,11 +149,19 @@ impl ProxyManager { /// such as routing state and compatibility tracking. pub async fn run(self) -> Result<(), ProxyError> { debug!("ProxyManager starting"); + let remote_mfa_responses = Arc::default(); // Retrieve proxies from DB. let mut proxies: Vec = Proxy::all(&self.pool) .await? .iter() - .map(|proxy| ProxyServer::from_proxy(proxy, self.pool.clone(), &self.tx)) + .map(|proxy| { + ProxyServer::from_proxy( + proxy, + self.pool.clone(), + &self.tx, + Arc::clone(&remote_mfa_responses), + ) + }) .collect::>()?; debug!("Retrieved {} proxies from the DB", proxies.len()); @@ -159,7 +169,7 @@ impl ProxyManager { if let Some(ref url) = server_config().proxy_url { debug!("Adding proxy from cli arg: {url}"); let url = Url::from_str(url)?; - let proxy = ProxyServer::new(self.pool.clone(), url, &self.tx); + let proxy = ProxyServer::new(self.pool.clone(), url, &self.tx, remote_mfa_responses); proxies.push(proxy); } @@ -227,9 +237,14 @@ struct ProxyServer { } impl ProxyServer { - pub fn new(pool: PgPool, url: Url, tx: &ProxyTxSet) -> Self { + pub fn new( + pool: PgPool, + url: Url, + tx: &ProxyTxSet, + remote_mfa_responses: Arc>>>, + ) -> Self { // Instantiate gRPC servers. - let services = ProxyServices::new(&pool, tx); + let services = ProxyServices::new(&pool, tx, remote_mfa_responses); Self { pool, @@ -238,9 +253,14 @@ impl ProxyServer { } } - fn from_proxy(proxy: &Proxy, pool: PgPool, tx: &ProxyTxSet) -> Result { + fn from_proxy( + proxy: &Proxy, + pool: PgPool, + tx: &ProxyTxSet, + remote_mfa_responses: Arc>>>, + ) -> Result { let url = Url::from_str(&format!("http://{}:{}", proxy.address, proxy.port))?; - Ok(Self::new(pool, url, tx)) + Ok(Self::new(pool, url, tx, remote_mfa_responses)) } fn endpoint(&self, scheme: Scheme) -> Result { @@ -641,6 +661,23 @@ impl ProxyServer { } } // rpc ClientMfaFinish (ClientMfaFinishRequest) returns (ClientMfaFinishResponse) + Some(core_request::Payload::ClientRemoteMfaFinish(request)) => { + match self + .services + .client_mfa + .finish_remote_client_mfa_login(request) + .await + { + Ok(response_payload) => Some( + core_response::Payload::ClientRemoteMfaFinish(response_payload), + ), + Err(err) => { + error!("Client MFA finish error: {err}"); + Some(core_response::Payload::CoreError(err.into())) + } + } + } + // rpc ClientMfaFinish (ClientMfaFinishRequest) returns (ClientMfaFinishResponse) Some(core_request::Payload::ClientMfaFinish(request)) => { match self .services @@ -897,7 +934,11 @@ struct ProxyServices { } impl ProxyServices { - pub fn new(pool: &PgPool, tx: &ProxyTxSet) -> Self { + pub fn new( + pool: &PgPool, + tx: &ProxyTxSet, + remote_mfa_responses: Arc>>>, + ) -> Self { let enrollment = EnrollmentServer::new( pool.clone(), tx.wireguard.clone(), @@ -911,6 +952,7 @@ impl ProxyServices { tx.mail.clone(), tx.wireguard.clone(), tx.bidi_events.clone(), + remote_mfa_responses, ); let polling = PollingServer::new(pool.clone()); From 55954fe7673199cb8b3a88371c6105c3bc10e982 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Sat, 24 Jan 2026 17:07:31 +0100 Subject: [PATCH 03/14] sessions behind Arc> --- .../src/enterprise/grpc/desktop_client_mfa.rs | 53 +++++++++---- .../src/grpc/proxy/client_mfa.rs | 77 +++++++++++-------- crates/defguard_proxy_manager/src/lib.rs | 23 +++++- 3 files changed, 103 insertions(+), 50 deletions(-) diff --git a/crates/defguard_core/src/enterprise/grpc/desktop_client_mfa.rs b/crates/defguard_core/src/enterprise/grpc/desktop_client_mfa.rs index fc975e5425..e23c402c20 100644 --- a/crates/defguard_core/src/enterprise/grpc/desktop_client_mfa.rs +++ b/crates/defguard_core/src/enterprise/grpc/desktop_client_mfa.rs @@ -42,7 +42,13 @@ impl ClientMfaServer { let pubkey = Self::parse_token(&token)?; // fetch login session - let Some(session) = self.sessions.get(&pubkey).cloned() else { + let Some(session) = self + .sessions + .read() + .expect("Failed to read-lock ClientMfaServer::sessions") + .get(&pubkey) + .cloned() + else { debug!("Client login session not found"); return Err(Status::invalid_argument("login session not found")); }; @@ -62,7 +68,10 @@ impl ClientMfaServer { if method != MfaMethod::Oidc { debug!("Invalid MFA method for OIDC authentication: {method:?}"); - self.sessions.remove(&pubkey); + self.sessions + .write() + .expect("Failed to write-lock ClientMfaServer::sessions") + .remove(&pubkey); return Err(Status::invalid_argument("invalid MFA method")); } @@ -81,7 +90,10 @@ impl ClientMfaServer { }) { Ok(url) => url, Err(status) => { - self.sessions.remove(&pubkey); + self.sessions + .write() + .expect("Failed to write-lock ClientMfaServer::sessions") + .remove(&pubkey); self.emit_event(BidiStreamEvent { context, event: BidiStreamEventType::DesktopClientMfa(Box::new( @@ -102,7 +114,10 @@ impl ClientMfaServer { // if thats not our user, prevent login if claims_user.id != user.id { info!("User {claims_user} tried to use OIDC MFA for another user: {user}"); - self.sessions.remove(&pubkey); + self.sessions + .write() + .expect("Failed to write-lock ClientMfaServer::sessions") + .remove(&pubkey); self.emit_event(BidiStreamEvent { context, event: BidiStreamEventType::DesktopClientMfa(Box::new( @@ -123,7 +138,10 @@ impl ClientMfaServer { } Err(err) => { info!("Failed to verify OIDC code: {err}"); - self.sessions.remove(&pubkey); + self.sessions + .write() + .expect("Failed to write-lock ClientMfaServer::sessions") + .remove(&pubkey); self.emit_event(BidiStreamEvent { context, event: BidiStreamEventType::DesktopClientMfa(Box::new( @@ -139,17 +157,20 @@ impl ClientMfaServer { } } - self.sessions.insert( - pubkey.clone(), - ClientLoginSession { - method, - device: device.clone(), - location: location.clone(), - user: user.clone(), - openid_auth_completed: true, - biometric_challenge: None, - }, - ); + self.sessions + .write() + .expect("Failed to write-lock ClientMfaServer::sessions") + .insert( + pubkey.clone(), + ClientLoginSession { + method, + device: device.clone(), + location: location.clone(), + user: user.clone(), + openid_auth_completed: true, + biometric_challenge: None, + }, + ); Ok(()) } diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index 213ed87c59..3fce16c23d 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -53,7 +53,7 @@ impl From for Status { } #[derive(Clone)] -pub(crate) struct ClientLoginSession { +pub struct ClientLoginSession { pub(crate) method: MfaMethod, pub(crate) location: WireguardNetwork, pub(crate) device: Device, @@ -66,7 +66,7 @@ pub struct ClientMfaServer { pub(crate) pool: PgPool, mail_tx: UnboundedSender, wireguard_tx: Sender, - pub(crate) sessions: HashMap, + pub(crate) sessions: Arc>>, remote_mfa_responses: Arc>>>, bidi_event_tx: UnboundedSender, } @@ -78,15 +78,16 @@ impl ClientMfaServer { mail_tx: UnboundedSender, wireguard_tx: Sender, bidi_event_tx: UnboundedSender, - remote_mfa_responses: Arc>>>, + remote_mfa_responses: Arc>>>, + sessions: Arc>>, ) -> Self { Self { pool, mail_tx, wireguard_tx, bidi_event_tx, - remote_mfa_responses, - sessions: HashMap::new(), + remote_mfa_responses, + sessions, } } @@ -124,7 +125,11 @@ impl ClientMfaServer { request: ClientMfaTokenValidationRequest, ) -> Result { let pubkey = Self::parse_token(&request.token)?; - let session_active = self.sessions.contains_key(&pubkey); + let session_active = self + .sessions + .read() + .expect("Failed to read-lock ClientMfaServer::sessions") + .contains_key(&pubkey); Ok(ClientMfaTokenValidationResponse { token_valid: session_active, }) @@ -319,17 +324,20 @@ impl ClientMfaServer { .map(|challenge| challenge.challenge.clone()); // store login session - self.sessions.insert( - request.pubkey, - ClientLoginSession { - method: selected_method, - location, - device, - user, - openid_auth_completed: false, - biometric_challenge, - }, - ); + self.sessions + .write() + .expect("Failed to write-lock ClientMfaServer::sessions") + .insert( + request.pubkey, + ClientLoginSession { + method: selected_method, + location, + device, + user, + openid_auth_completed: false, + biometric_challenge, + }, + ); Ok(ClientMfaStartResponse { token, @@ -410,7 +418,13 @@ impl ClientMfaServer { let pubkey = Self::parse_token(&request.token)?; // fetch login session - let Some(session) = self.sessions.get(&pubkey) else { + let Some(session) = self + .sessions + .read() + .expect("Failed to read-lock ClientMfaServer::sessions") + .get(&pubkey) + .cloned() + else { error!("Client login session not found"); return Err(Status::invalid_argument("login session not found")); }; @@ -467,7 +481,7 @@ impl ClientMfaServer { DesktopClientMfaEvent::Failed { location: location.clone(), device: device.clone(), - method: *method, + method, message: "Signed challenge rejected".to_string(), }, )), @@ -502,7 +516,7 @@ impl ClientMfaServer { DesktopClientMfaEvent::Failed { location: location.clone(), device: device.clone(), - method: *method, + method, message: "Signed challenge rejected".to_string(), }, )), @@ -522,7 +536,7 @@ impl ClientMfaServer { DesktopClientMfaEvent::Failed { location: location.clone(), device: device.clone(), - method: *method, + method, message: "TOTP code not provided in request".to_string(), }, )), @@ -537,7 +551,7 @@ impl ClientMfaServer { DesktopClientMfaEvent::Failed { location: location.clone(), device: device.clone(), - method: *method, + method, message: "invalid TOTP code".to_string(), }, )), @@ -556,7 +570,7 @@ impl ClientMfaServer { DesktopClientMfaEvent::Failed { location: location.clone(), device: device.clone(), - method: *method, + method, message: "email MFA code not provided in request".to_string(), }, )), @@ -571,7 +585,7 @@ impl ClientMfaServer { DesktopClientMfaEvent::Failed { location: location.clone(), device: device.clone(), - method: *method, + method, message: "invalid email MFA code".to_string(), }, )), @@ -580,7 +594,7 @@ impl ClientMfaServer { } } MfaMethod::Oidc => { - if !*openid_auth_completed { + if !openid_auth_completed { debug!( "User {user} tried to finish OIDC MFA login but they haven't completed \ the OIDC authentication yet." @@ -591,7 +605,7 @@ impl ClientMfaServer { DesktopClientMfaEvent::Failed { location: location.clone(), device: device.clone(), - method: *method, + method, message: "tried to finish OIDC MFA login but they haven't \ completed OIDC authentication yet" .to_string(), @@ -669,7 +683,7 @@ impl ClientMfaServer { DesktopClientMfaEvent::Connected { location: location.clone(), device: device.clone(), - method: *method, + method, }, )), })?; @@ -683,7 +697,10 @@ impl ClientMfaServer { }; // remove login session from map - self.sessions.remove(&pubkey); + self.sessions + .write() + .expect("Failed to write-lock ClientMfaServer::sessions") + .remove(&pubkey); // commit transaction transaction.commit().await.map_err(|_| { @@ -691,7 +708,7 @@ impl ClientMfaServer { Status::internal("unexpected error") })?; - // If there is a desktop client websocket waiting for the preshared key, send it. + // If there is a desktop client websocket waiting for the preshared key, send it. if let (Some(tx), Some(ref preshared_key)) = ( self.remote_mfa_responses .write() @@ -699,7 +716,7 @@ impl ClientMfaServer { .remove(&request.token), network_device.preshared_key, ) { - // TODO(jck) error handling + // TODO(jck) error handling let _ = tx.send(preshared_key.clone()); } diff --git a/crates/defguard_proxy_manager/src/lib.rs b/crates/defguard_proxy_manager/src/lib.rs index c28e0a86b7..abfdd7f0ab 100644 --- a/crates/defguard_proxy_manager/src/lib.rs +++ b/crates/defguard_proxy_manager/src/lib.rs @@ -29,7 +29,10 @@ use defguard_core::{ ldap::utils::ldap_update_user_state, }, events::BidiStreamEvent, - grpc::{gateway::events::GatewayEvent, proxy::client_mfa::ClientMfaServer}, + grpc::{ + gateway::events::GatewayEvent, + proxy::client_mfa::{ClientLoginSession, ClientMfaServer}, + }, version::{IncompatibleComponents, IncompatibleProxyData, is_proxy_version_supported}, }; use defguard_mail::Mail; @@ -150,6 +153,7 @@ impl ProxyManager { pub async fn run(self) -> Result<(), ProxyError> { debug!("ProxyManager starting"); let remote_mfa_responses = Arc::default(); + let sessions = Arc::default(); // Retrieve proxies from DB. let mut proxies: Vec = Proxy::all(&self.pool) .await? @@ -160,6 +164,7 @@ impl ProxyManager { self.pool.clone(), &self.tx, Arc::clone(&remote_mfa_responses), + Arc::clone(&sessions), ) }) .collect::>()?; @@ -169,7 +174,13 @@ impl ProxyManager { if let Some(ref url) = server_config().proxy_url { debug!("Adding proxy from cli arg: {url}"); let url = Url::from_str(url)?; - let proxy = ProxyServer::new(self.pool.clone(), url, &self.tx, remote_mfa_responses); + let proxy = ProxyServer::new( + self.pool.clone(), + url, + &self.tx, + remote_mfa_responses, + sessions, + ); proxies.push(proxy); } @@ -242,9 +253,10 @@ impl ProxyServer { url: Url, tx: &ProxyTxSet, remote_mfa_responses: Arc>>>, + sessions: Arc>>, ) -> Self { // Instantiate gRPC servers. - let services = ProxyServices::new(&pool, tx, remote_mfa_responses); + let services = ProxyServices::new(&pool, tx, remote_mfa_responses, sessions); Self { pool, @@ -258,9 +270,10 @@ impl ProxyServer { pool: PgPool, tx: &ProxyTxSet, remote_mfa_responses: Arc>>>, + sessions: Arc>>, ) -> Result { let url = Url::from_str(&format!("http://{}:{}", proxy.address, proxy.port))?; - Ok(Self::new(pool, url, tx, remote_mfa_responses)) + Ok(Self::new(pool, url, tx, remote_mfa_responses, sessions)) } fn endpoint(&self, scheme: Scheme) -> Result { @@ -938,6 +951,7 @@ impl ProxyServices { pool: &PgPool, tx: &ProxyTxSet, remote_mfa_responses: Arc>>>, + sessions: Arc>>, ) -> Self { let enrollment = EnrollmentServer::new( pool.clone(), @@ -953,6 +967,7 @@ impl ProxyServices { tx.wireguard.clone(), tx.bidi_events.clone(), remote_mfa_responses, + sessions, ); let polling = PollingServer::new(pool.clone()); From 8cd6ef904588740c78998dbc88e9d39815962e0e Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Sun, 25 Jan 2026 15:09:10 +0100 Subject: [PATCH 04/14] update protos --- proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto b/proto index ec48aca943..1469aba842 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit ec48aca9438e7cdcb4fcdb01ce6dcb5dac7f8dd3 +Subproject commit 1469aba84232c333e355f0db3d29a1666ac9bad5 From c67743092ca92cfabefbd0df76488b31af1d16b6 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 10:31:35 +0100 Subject: [PATCH 05/14] wip wait for remote mfa in new tokio thread --- .../src/grpc/proxy/client_mfa.rs | 45 ++++++++++++++----- crates/defguard_proxy_manager/src/lib.rs | 24 +++++----- 2 files changed, 48 insertions(+), 21 deletions(-) diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index 3fce16c23d..f180c92bc5 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -20,7 +20,7 @@ use defguard_mail::Mail; use defguard_proto::proxy::{ self, ClientMfaFinishRequest, ClientMfaFinishResponse, ClientMfaStartRequest, ClientMfaStartResponse, ClientMfaTokenValidationRequest, ClientMfaTokenValidationResponse, - ClientRemoteMfaFinishRequest, ClientRemoteMfaFinishResponse, MfaMethod, + ClientRemoteMfaFinishRequest, ClientRemoteMfaFinishResponse, CoreResponse, MfaMethod, }; use sqlx::PgPool; use thiserror::Error; @@ -387,7 +387,10 @@ impl ClientMfaServer { pub async fn finish_remote_client_mfa_login( &mut self, request: ClientRemoteMfaFinishRequest, - ) -> Result { + response_tx: UnboundedSender, + request_id: u64, + // ) -> Result { + ) -> Result<(), Status> { debug!("Finishing desktop client login: {request:?}"); let (tx, rx) = oneshot::channel(); self.remote_mfa_responses @@ -395,16 +398,36 @@ impl ClientMfaServer { .expect("Failed to write-lock ClientMfaServer::remote_mfa_responses") .insert(request.token.clone(), tx); - // TODO(jck) do we need timeout here? memory leaks possible? - let preshared_key = rx.await.map_err(|err| { - error!("Remote MFA responses channel failed: {err:?}"); - Status::internal("Remote MFA responses channel failed") - })?; + tokio::spawn(async move { + // TODO(jck) do we need timeout here? memory leaks possible? + // let preshared_key = rx.await.map_err(|err| { + // error!("Remote MFA responses channel failed: {err:?}"); + // // Status::internal("Remote MFA responses channel failed") + // }); + match rx.await { + Ok(preshared_key) => { + let req = CoreResponse { + id: request_id, + payload: Some(proxy::core_response::Payload::ClientRemoteMfaFinish( + ClientRemoteMfaFinishResponse { + token: request.token, + preshared_key, + }, + )), + }; + let _ = response_tx.send(req); + } + Err(err) => { + error!("Remote MFA responses channel failed: {err:?}"); + } + } + }); - Ok(ClientRemoteMfaFinishResponse { - token: request.token, - preshared_key, - }) + // Ok(ClientRemoteMfaFinishResponse { + // token: request.token, + // preshared_key, + // }) + Ok(()) } #[instrument(skip_all)] diff --git a/crates/defguard_proxy_manager/src/lib.rs b/crates/defguard_proxy_manager/src/lib.rs index abfdd7f0ab..ebab49caca 100644 --- a/crates/defguard_proxy_manager/src/lib.rs +++ b/crates/defguard_proxy_manager/src/lib.rs @@ -484,6 +484,7 @@ impl ProxyServer { } Ok(Some(received)) => { debug!("Received message from proxy; ID={}", received.id); + error!("Received message from proxy; ID={}", received.id); let payload = match received.payload { // rpc CodeMfaSetupStart return (CodeMfaSetupStartResponse) Some(core_request::Payload::CodeMfaSetupStart(request)) => { @@ -678,14 +679,15 @@ impl ProxyServer { match self .services .client_mfa - .finish_remote_client_mfa_login(request) + .finish_remote_client_mfa_login(request, tx.clone(), received.id) .await { - Ok(response_payload) => Some( - core_response::Payload::ClientRemoteMfaFinish(response_payload), - ), + // Ok(response_payload) => Some( + // core_response::Payload::ClientRemoteMfaFinish(response_payload), + // ), + Ok(response_payload) => None, Err(err) => { - error!("Client MFA finish error: {err}"); + error!("Client MFA remote finish error: {err}"); Some(core_response::Payload::CoreError(err.into())) } } @@ -914,11 +916,13 @@ impl ProxyServer { None => None, }; - let req = CoreResponse { - id: received.id, - payload, - }; - let _ = tx.send(req); + if let Some(payload) = payload { + let req = CoreResponse { + id: received.id, + payload: Some(payload), + }; + let _ = tx.send(req); + } } Err(err) => { error!("Disconnected from proxy at {}: {err}", self.url); From a4ea3bde0bc8314f8368c55c273568b64f525aef Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 10:39:19 +0100 Subject: [PATCH 06/14] cleanup --- crates/defguard_core/src/grpc/proxy/client_mfa.rs | 11 +---------- crates/defguard_proxy_manager/src/lib.rs | 8 ++------ 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index f180c92bc5..1e21264ec1 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -389,7 +389,6 @@ impl ClientMfaServer { request: ClientRemoteMfaFinishRequest, response_tx: UnboundedSender, request_id: u64, - // ) -> Result { ) -> Result<(), Status> { debug!("Finishing desktop client login: {request:?}"); let (tx, rx) = oneshot::channel(); @@ -399,11 +398,7 @@ impl ClientMfaServer { .insert(request.token.clone(), tx); tokio::spawn(async move { - // TODO(jck) do we need timeout here? memory leaks possible? - // let preshared_key = rx.await.map_err(|err| { - // error!("Remote MFA responses channel failed: {err:?}"); - // // Status::internal("Remote MFA responses channel failed") - // }); + // TODO(jck) do we need a timeout here? memory leaks possible? match rx.await { Ok(preshared_key) => { let req = CoreResponse { @@ -423,10 +418,6 @@ impl ClientMfaServer { } }); - // Ok(ClientRemoteMfaFinishResponse { - // token: request.token, - // preshared_key, - // }) Ok(()) } diff --git a/crates/defguard_proxy_manager/src/lib.rs b/crates/defguard_proxy_manager/src/lib.rs index ebab49caca..1c4a4204a7 100644 --- a/crates/defguard_proxy_manager/src/lib.rs +++ b/crates/defguard_proxy_manager/src/lib.rs @@ -484,7 +484,6 @@ impl ProxyServer { } Ok(Some(received)) => { debug!("Received message from proxy; ID={}", received.id); - error!("Received message from proxy; ID={}", received.id); let payload = match received.payload { // rpc CodeMfaSetupStart return (CodeMfaSetupStartResponse) Some(core_request::Payload::CodeMfaSetupStart(request)) => { @@ -682,12 +681,9 @@ impl ProxyServer { .finish_remote_client_mfa_login(request, tx.clone(), received.id) .await { - // Ok(response_payload) => Some( - // core_response::Payload::ClientRemoteMfaFinish(response_payload), - // ), - Ok(response_payload) => None, + Ok(()) => None, Err(err) => { - error!("Client MFA remote finish error: {err}"); + error!("Client remote MFA finish error: {err}"); Some(core_response::Payload::CoreError(err.into())) } } From 7c0ddc2361e373b89b1a9dadd2f4fa4c3c49e308 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 10:42:55 +0100 Subject: [PATCH 07/14] update proto --- proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto b/proto index 1469aba842..246f5f856e 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit 1469aba84232c333e355f0db3d29a1666ac9bad5 +Subproject commit 246f5f856e3ed3d6549897c8a0a84ed46b6165fc From 2c7886997206f625c19ab4790fd3b24dcdca0487 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 11:08:34 +0100 Subject: [PATCH 08/14] remove unused proto field --- crates/defguard_core/src/grpc/proxy/client_mfa.rs | 5 +---- proto | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index 1e21264ec1..5b8699456c 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -404,10 +404,7 @@ impl ClientMfaServer { let req = CoreResponse { id: request_id, payload: Some(proxy::core_response::Payload::ClientRemoteMfaFinish( - ClientRemoteMfaFinishResponse { - token: request.token, - preshared_key, - }, + ClientRemoteMfaFinishResponse { preshared_key }, )), }; let _ = response_tx.send(req); diff --git a/proto b/proto index 246f5f856e..ba933c4c1f 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit 246f5f856e3ed3d6549897c8a0a84ed46b6165fc +Subproject commit ba933c4c1fa3edb7c0e3dbafa123f9446b3f9533 From b0b850421b3dc046968cbc4899b34df92ba8489b Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 12:25:28 +0100 Subject: [PATCH 09/14] timeout on remote mfa rx channel read --- .../src/grpc/proxy/client_mfa.rs | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index 5b8699456c..d7e30e068e 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - sync::{Arc, RwLock}, + sync::{Arc, RwLock}, time::Duration, }; use chrono::Utc; @@ -24,11 +24,11 @@ use defguard_proto::proxy::{ }; use sqlx::PgPool; use thiserror::Error; -use tokio::sync::{ +use tokio::{sync::{ broadcast::Sender, mpsc::{UnboundedSender, error::SendError}, oneshot, -}; +}, time}; use tonic::{Code, Status}; use crate::{ @@ -40,6 +40,9 @@ use crate::{ const CLIENT_SESSION_TIMEOUT: u64 = 60 * 5; // 10 minutes +// How much time the user has to approve remote MFA with mobile device +const REMOTE_AUTH_TIMEOUT: Duration = Duration::from_secs(60); + #[derive(Debug, Error)] pub enum ClientMfaServerError { #[error("gRPC event channel error: {0}")] @@ -397,20 +400,24 @@ impl ClientMfaServer { .expect("Failed to write-lock ClientMfaServer::remote_mfa_responses") .insert(request.token.clone(), tx); + // Spawn a task that waits for remote MFA process to conclude to get the preshared key. tokio::spawn(async move { - // TODO(jck) do we need a timeout here? memory leaks possible? - match rx.await { - Ok(preshared_key) => { + match time::timeout(REMOTE_AUTH_TIMEOUT, rx).await { + Ok(Ok(preshared_key)) => { let req = CoreResponse { id: request_id, payload: Some(proxy::core_response::Payload::ClientRemoteMfaFinish( ClientRemoteMfaFinishResponse { preshared_key }, )), }; + // Once the key is here, send it back to proxy. let _ = response_tx.send(req); } + Ok(Err(err)) => { + error!("Remote MFA response channel failed: {err:?}"); + } Err(err) => { - error!("Remote MFA responses channel failed: {err:?}"); + error!("Remote MFA process with request_id {request_id} timed out: {err:?}"); } } }); From 9b3b62b3adf013742a5337dae21cf6c42a44a668 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 12:28:06 +0100 Subject: [PATCH 10/14] remove todo comment --- crates/defguard_core/src/grpc/proxy/client_mfa.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index d7e30e068e..d544304bba 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -734,7 +734,6 @@ impl ClientMfaServer { .remove(&request.token), network_device.preshared_key, ) { - // TODO(jck) error handling let _ = tx.send(preshared_key.clone()); } From 2a4239a1c3d26f9096d878c0f129c84ac97019fa Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 13:15:09 +0100 Subject: [PATCH 11/14] warn instead of err on remote mfa timeouts --- .../src/grpc/proxy/client_mfa.rs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index d544304bba..c9ccf7b91f 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, - sync::{Arc, RwLock}, time::Duration, + sync::{Arc, RwLock}, + time::Duration, }; use chrono::Utc; @@ -24,11 +25,14 @@ use defguard_proto::proxy::{ }; use sqlx::PgPool; use thiserror::Error; -use tokio::{sync::{ - broadcast::Sender, - mpsc::{UnboundedSender, error::SendError}, - oneshot, -}, time}; +use tokio::{ + sync::{ + broadcast::Sender, + mpsc::{UnboundedSender, error::SendError}, + oneshot, + }, + time, +}; use tonic::{Code, Status}; use crate::{ @@ -416,8 +420,8 @@ impl ClientMfaServer { Ok(Err(err)) => { error!("Remote MFA response channel failed: {err:?}"); } - Err(err) => { - error!("Remote MFA process with request_id {request_id} timed out: {err:?}"); + Err(_) => { + warn!("Remote MFA process with request_id {request_id} timed out"); } } }); From e33c61ef6475aa8a36f2dc47a80a042371779910 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 13:25:18 +0100 Subject: [PATCH 12/14] rename remote mfa proto rpc --- crates/defguard_core/src/grpc/proxy/client_mfa.rs | 15 ++++++++------- crates/defguard_proxy_manager/src/lib.rs | 6 +++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/defguard_core/src/grpc/proxy/client_mfa.rs b/crates/defguard_core/src/grpc/proxy/client_mfa.rs index c9ccf7b91f..667314eaf4 100644 --- a/crates/defguard_core/src/grpc/proxy/client_mfa.rs +++ b/crates/defguard_core/src/grpc/proxy/client_mfa.rs @@ -19,9 +19,10 @@ use defguard_common::{ }; use defguard_mail::Mail; use defguard_proto::proxy::{ - self, ClientMfaFinishRequest, ClientMfaFinishResponse, ClientMfaStartRequest, - ClientMfaStartResponse, ClientMfaTokenValidationRequest, ClientMfaTokenValidationResponse, - ClientRemoteMfaFinishRequest, ClientRemoteMfaFinishResponse, CoreResponse, MfaMethod, + self, AwaitRemoteMfaFinishRequest, AwaitRemoteMfaFinishResponse, ClientMfaFinishRequest, + ClientMfaFinishResponse, ClientMfaStartRequest, ClientMfaStartResponse, + ClientMfaTokenValidationRequest, ClientMfaTokenValidationResponse, CoreResponse, MfaMethod, + core_response::Payload, }; use sqlx::PgPool; use thiserror::Error; @@ -391,9 +392,9 @@ impl ClientMfaServer { } #[instrument(skip_all)] - pub async fn finish_remote_client_mfa_login( + pub async fn await_remote_mfa_login( &mut self, - request: ClientRemoteMfaFinishRequest, + request: AwaitRemoteMfaFinishRequest, response_tx: UnboundedSender, request_id: u64, ) -> Result<(), Status> { @@ -410,8 +411,8 @@ impl ClientMfaServer { Ok(Ok(preshared_key)) => { let req = CoreResponse { id: request_id, - payload: Some(proxy::core_response::Payload::ClientRemoteMfaFinish( - ClientRemoteMfaFinishResponse { preshared_key }, + payload: Some(Payload::AwaitRemoteMfaFinish( + AwaitRemoteMfaFinishResponse { preshared_key }, )), }; // Once the key is here, send it back to proxy. diff --git a/crates/defguard_proxy_manager/src/lib.rs b/crates/defguard_proxy_manager/src/lib.rs index 1c4a4204a7..9ca6f173f9 100644 --- a/crates/defguard_proxy_manager/src/lib.rs +++ b/crates/defguard_proxy_manager/src/lib.rs @@ -673,12 +673,12 @@ impl ProxyServer { } } } - // rpc ClientMfaFinish (ClientMfaFinishRequest) returns (ClientMfaFinishResponse) - Some(core_request::Payload::ClientRemoteMfaFinish(request)) => { + // rpc ClientRemoteMfaFinish (ClientRemoteMfaFinishRequest) returns (ClientRemoteMfaFinishResponse) + Some(core_request::Payload::AwaitRemoteMfaFinish(request)) => { match self .services .client_mfa - .finish_remote_client_mfa_login(request, tx.clone(), received.id) + .await_remote_mfa_login(request, tx.clone(), received.id) .await { Ok(()) => None, From db94fdfc6ddc5348fa9cd873054ddbff8fad3b95 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 13:34:06 +0100 Subject: [PATCH 13/14] update proto --- proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto b/proto index ba933c4c1f..3a18dbc52f 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit ba933c4c1fa3edb7c0e3dbafa123f9446b3f9533 +Subproject commit 3a18dbc52f098f37d5d78e88904631faf8db78f3 From b18598ec99f345363ad790e975e69b2e7c55225a Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 26 Jan 2026 13:56:47 +0100 Subject: [PATCH 14/14] udpate protos --- proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto b/proto index 3a18dbc52f..0b982922c4 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit 3a18dbc52f098f37d5d78e88904631faf8db78f3 +Subproject commit 0b982922c4dab3304a8cb01aed1d8cee806600b7