diff --git a/Cargo.lock b/Cargo.lock index e9530c1277..536d2550b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1429,6 +1429,7 @@ dependencies = [ "tower", "tower-http", "tracing", + "tracing-subscriber", "trait-variant", "uaparser", "utoipa", diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index 0971412a5e..8b25fe36d8 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -25,6 +25,7 @@ use defguard_core::{ events::{ApiEvent, BidiStreamEvent}, grpc::{GatewayEvent, WorkerState, run_grpc_server}, init_dev_env, init_vpn_location, run_web_server, + setup_logs::CoreSetupLogLayer, utility_thread::run_utility_thread, version::IncompatibleComponents, }; @@ -43,7 +44,7 @@ use tokio::sync::{ broadcast, mpsc::{channel, unbounded_channel}, }; -use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[macro_use] extern crate tracing; @@ -54,11 +55,15 @@ async fn main() -> Result<(), anyhow::Error> { dotenvy::dotenv().ok(); } let mut config = DefGuardConfig::new(); + let log_filter = format!( + "{},defguard_core::handlers::component_setup=debug", + config.log_level + ); - let subscriber = tracing_subscriber::registry(); + let subscriber = tracing_subscriber::registry().with(CoreSetupLogLayer); defguard_version::tracing::with_version_formatters( &defguard_version::Version::parse(VERSION)?, - &config.log_level, + &log_filter, subscriber, ) .init(); diff --git a/crates/defguard_core/Cargo.toml b/crates/defguard_core/Cargo.toml index 0927fce948..6a4c176b61 100644 --- a/crates/defguard_core/Cargo.toml +++ b/crates/defguard_core/Cargo.toml @@ -70,6 +70,7 @@ totp-lite = { workspace = true } tower = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } +tracing-subscriber = { workspace = true } trait-variant = { workspace = true } # openapi utoipa = { workspace = true } diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index 0f6fe8e34d..378b962ce2 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -1,4 +1,9 @@ -use std::{convert::Infallible, time::Duration}; +use std::{ + collections::VecDeque, + convert::Infallible, + sync::{Arc, Mutex}, + time::Duration, +}; use axum::{ Extension, @@ -37,10 +42,12 @@ use tonic::{ service::Interceptor, transport::{Certificate, ClientTlsConfig, Endpoint}, }; +use tracing::Instrument; use crate::{ auth::{AdminOrSetupRole, SessionInfo}, enterprise::is_enterprise_license_active, + setup_logs::scope_setup_logs, version::{MIN_GATEWAY_VERSION, MIN_PROXY_VERSION}, }; @@ -152,13 +159,18 @@ fn set_step_message(next_step: SetupStep) -> Event { struct SetupFlow { last_step: SetupStep, + log_buffer: Arc>>, log_rx: tokio::sync::mpsc::UnboundedReceiver, } impl SetupFlow { - const fn new(log_rx: tokio::sync::mpsc::UnboundedReceiver) -> Self { + fn new( + log_rx: tokio::sync::mpsc::UnboundedReceiver, + log_buffer: Arc>>, + ) -> Self { Self { last_step: SetupStep::CheckingConfiguration, + log_buffer, log_rx, } } @@ -169,7 +181,15 @@ impl SetupFlow { } fn error(&mut self, message: &str) -> Event { - let mut collected_logs = Vec::new(); + error!("{message}"); + + let mut collected_logs = { + let mut guard = self + .log_buffer + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + std::mem::take(&mut *guard).into_iter().collect::>() + }; while let Ok(log) = self.log_rx.try_recv() { collected_logs.push(log); } @@ -178,6 +198,7 @@ impl SetupFlow { } else { Some(collected_logs) }; + error_message(message, self.last_step, logs) } } @@ -193,9 +214,10 @@ pub async fn setup_proxy_tls_stream( proxy_control_tx: Option>>, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); - - let stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx); + let log_buffer = Arc::new(Mutex::new(VecDeque::new())); + let inner_log_buffer = Arc::clone(&log_buffer); + let inner_stream = async_stream::stream! { + let mut flow = SetupFlow::new(log_rx, inner_log_buffer.clone()); // check if tries to connect more then 1 proxy without active enterprise license if !is_enterprise_license_active() { @@ -203,8 +225,7 @@ pub async fn setup_proxy_tls_stream( Ok(current_proxies) => { if !current_proxies.is_empty() { yield Ok(flow.error( - "Enterprise license is required for connecting more \ - then one Edge.", + "Enterprise license is required for connecting more than one Edge.", )); return; } @@ -216,20 +237,14 @@ pub async fn setup_proxy_tls_stream( } } + debug!("License check passed"); + // Step 1: Check configuration yield Ok(flow.step(SetupStep::CheckingConfiguration)); - - match Proxy::find_by_address_port( - &pool, - &request.ip_or_domain, - i32::from(request.grpc_port), - ) - .await - { + match Proxy::find_by_address_port(&pool, &request.ip_or_domain, i32::from(request.grpc_port)).await { Ok(Some(proxy)) => { yield Ok(flow.error(&format!( - "An edge Proxy with address {}:{} is already \ - registered with name \"{}\".", + "An edge Proxy with address {}:{} is already registered with name \"{}\".", request.ip_or_domain, request.grpc_port, proxy.name ))); return; @@ -246,8 +261,9 @@ pub async fn setup_proxy_tls_stream( } } - let url_str = format!("http://{}:{}", request.ip_or_domain, request.grpc_port); + debug!("Configuration check passed"); + let url_str = format!("http://{}:{}", request.ip_or_domain, request.grpc_port); let url = match Url::parse(&url_str) { Ok(u) => u, Err(e) => { @@ -256,7 +272,7 @@ pub async fn setup_proxy_tls_stream( } }; - debug!("Successfully validated Edge address: {url_str}",); + debug!("Successfully validated Edge address: {url_str}"); let endpoint = match Endpoint::from_shared(url_str) { Ok(e) => e, @@ -315,7 +331,6 @@ pub async fn setup_proxy_tls_stream( yield Ok(flow.step(SetupStep::CheckingAvailability)); let version_clone = version.clone(); - let token = match Claims::new( defguard_common::auth::claims::ClaimsType::Gateway, url.to_string(), @@ -335,58 +350,48 @@ pub async fn setup_proxy_tls_stream( let version_interceptor = ClientVersionInterceptor::new(version); let auth_interceptor = AuthInterceptor::new(token); - - let mut client = ProxySetupClient::with_interceptor( - endpoint.connect_lazy(), - move |mut req: Request<()>| { - req = version_interceptor.clone().call(req)?; - auth_interceptor.clone().call(req) - }, - ); + let mut client = ProxySetupClient::with_interceptor(endpoint.connect_lazy(), move |mut req: Request<()>| { + req = version_interceptor.clone().call(req)?; + auth_interceptor.clone().call(req) + }); debug!( "Initiating connection to Edge at {}:{}", request.ip_or_domain, request.grpc_port ); - let response_with_metadata = - match tokio::time::timeout(CONNECTION_TIMEOUT, client.start(())).await { - Ok(Ok(r)) => r, - Ok(Err(e)) => { - match e.code() { - tonic::Code::Unavailable => { - let error_msg = e.to_string(); - if error_msg.contains("h2 protocol error") - || error_msg.contains("http2 error") - { - yield Ok(flow.error(&format!( - "Failed to connect to Edge at {}:{}: {}. This may indicate that \ - the Edge is already configured with TLS. Please check if the Edge \ - has already been set up.", - request.ip_or_domain, request.grpc_port, e - ))); - } else { - yield Ok(flow.error(&format!( - "Failed to connect to Edge at {}:{}. Please ensure the address \ - and port are correct and that the Edge component is running.", - request.ip_or_domain, request.grpc_port - ))); - } - } - _ => { - yield Ok(flow.error(&format!("Failed to connect to Edge: {e}"))); + let response_with_metadata = match tokio::time::timeout(CONNECTION_TIMEOUT, client.start(())).await { + Ok(Ok(r)) => r, + Ok(Err(e)) => { + match e.code() { + tonic::Code::Unavailable => { + let error_msg = e.to_string(); + if error_msg.contains("h2 protocol error") || error_msg.contains("http2 error") { + yield Ok(flow.error(&format!( + "Failed to connect to Edge at {}:{}: {}. This may indicate that the Edge is already configured with TLS. Please check if the Edge has already been set up.", + request.ip_or_domain, request.grpc_port, e + ))); + } else { + yield Ok(flow.error(&format!( + "Failed to connect to Edge at {}:{}. Please ensure the address and port are correct and that the Edge component is running.", + request.ip_or_domain, request.grpc_port + ))); } } - return; - } - Err(_) => { - yield Ok(flow.error(&format!( - "Connection to Edge at {}:{} timed out after 10 seconds.", - request.ip_or_domain, request.grpc_port - ))); - return; + _ => { + yield Ok(flow.error(&format!("Failed to connect to Edge: {e}"))); + } } - }; + return; + } + Err(_) => { + yield Ok(flow.error(&format!( + "Connection to Edge at {}:{} timed out after 10 seconds.", + request.ip_or_domain, request.grpc_port + ))); + return; + } + }; debug!("Successfully connected to Edge"); @@ -407,8 +412,7 @@ pub async fn setup_proxy_tls_stream( if let Some(proxy_version) = proxy_version { if proxy_version < MIN_PROXY_VERSION { yield Ok(flow.error(&format!( - "Edge version {proxy_version} is older than Core version \ - {version_clone}. Please update the Edge component.", + "Edge version {proxy_version} is older than Core version {version_clone}. Please update the Edge component.", ))); return; } @@ -427,9 +431,7 @@ pub async fn setup_proxy_tls_stream( }; match serde_json::to_string(&response) { - Ok(body) => { - yield Ok(Event::default().data(body)); - } + Ok(body) => yield Ok(Event::default().data(body)), Err(e) => { yield Ok(flow.error(&format!("Failed to serialize version response: {e}"))); return; @@ -441,35 +443,37 @@ pub async fn setup_proxy_tls_stream( } let mut response = response_with_metadata.into_inner(); - - let log_reader_task = tokio::spawn(async move { - while let Some(log_entry) = response.next().await { - match log_entry { - Ok(entry) => { - let level = entry - .level - .strip_prefix("Level(") - .and_then(|s| s.strip_suffix(")")) - .unwrap_or(&entry.level) - .to_uppercase(); - - let formatted = format!( - "{} {} {}: message={}", - entry.timestamp, level, entry.target, entry.message - ); - if log_tx.send(formatted).is_err() { - break; + let spawn_log_buffer = inner_log_buffer.clone(); + let log_reader_task = tokio::spawn( + scope_setup_logs(spawn_log_buffer, async move { + while let Some(log_entry) = response.next().await { + match log_entry { + Ok(entry) => { + let level = entry + .level + .strip_prefix("Level(") + .and_then(|s| s.strip_suffix(")")) + .unwrap_or(&entry.level) + .to_uppercase(); + + let formatted = format!( + "{} {} {}: message={}", + entry.timestamp, level, entry.target, entry.message + ); + if log_tx.send(formatted).is_err() { + break; + } + } + Err(e) => { + let _ = log_tx.send(format!("Error reading log: {e}")); + break; + } } } - Err(e) => { - let _ = log_tx.send(format!("Error reading log: {e}")); - break; - } - } - } - }); + }) + .instrument(tracing::Span::current()), + ); - // Create guard to ensure task is aborted on all exit paths let _log_task_guard = TaskGuard(log_reader_task); // Step 4: Obtain CSR @@ -480,12 +484,7 @@ pub async fn setup_proxy_tls_stream( return; }; - let csr_response = match client - .get_csr(CertificateInfo { - cert_hostname: hostname.to_string(), - }) - .await - { + let csr_response = match client.get_csr(CertificateInfo { cert_hostname: hostname.to_string() }).await { Ok(r) => r.into_inner(), Err(e) => { yield Ok(flow.error(&format!("Failed to obtain CSR: {e}"))); @@ -501,29 +500,22 @@ pub async fn setup_proxy_tls_stream( } }; - debug!( - "Received certificate signing request from Edge for hostname: {hostname}" - ); + debug!("Received certificate signing request from Edge for hostname: {hostname}"); // Step 5: Sign certificate yield Ok(flow.step(SetupStep::SigningCertificate)); let settings = Settings::get_current_settings(); - let Some(ca_cert_der) = settings.ca_cert_der else { yield Ok(flow.error("CA certificate not found in settings")); return; }; - let Some(ca_key_pair) = settings.ca_key_der else { yield Ok(flow.error("CA key pair not found in settings")); return; }; - let ca = match defguard_certs::CertificateAuthority::from_cert_der_key_pair( - &ca_cert_der, - &ca_key_pair, - ) { + let ca = match defguard_certs::CertificateAuthority::from_cert_der_key_pair(&ca_cert_der, &ca_key_pair) { Ok(c) => c, Err(e) => { yield Ok(flow.error(&format!("Failed to create CA: {e}"))); @@ -546,28 +538,21 @@ pub async fn setup_proxy_tls_stream( // Step 6: Configure TLS yield Ok(flow.step(SetupStep::ConfiguringTls)); - let response = DerPayload { - der_data: cert.der().to_vec(), - }; - - if let Err(e) = client.send_cert(response).await { + if let Err(e) = client.send_cert(DerPayload { der_data: cert.der().to_vec() }).await { yield Ok(flow.error(&format!("Failed to send certificate: {e}"))); return; } debug!("Certificate successfully delivered to Edge"); - let defguard_certs::CertificateInfo { - not_after: expiry, - serial, - .. - } = match defguard_certs::CertificateInfo::from_der(cert.der()) { - Ok(dt) => dt, - Err(err) => { - yield Ok(flow.error(&format!("Failed to get certificate expiry: {err}"))); - return; - } - }; + let defguard_certs::CertificateInfo { not_after: expiry, serial, .. } = + match defguard_certs::CertificateInfo::from_der(cert.der()) { + Ok(dt) => dt, + Err(err) => { + yield Ok(flow.error(&format!("Failed to get certificate expiry: {err}"))); + return; + } + }; debug!("Certificate expiry date determined: {expiry}"); @@ -577,7 +562,6 @@ pub async fn setup_proxy_tls_stream( i32::from(request.grpc_port), &session.user.fullname(), ); - proxy.certificate = Some(serial); proxy.certificate_expiry = Some(expiry); @@ -594,11 +578,9 @@ pub async fn setup_proxy_tls_stream( request.common_name, proxy.id ); debug!("Establishing connection to newly configured Edge"); + if let Some(proxy_control_tx) = proxy_control_tx { - if let Err(err) = proxy_control_tx - .send(ProxyControlMessage::StartConnection(proxy.id)) - .await - { + if let Err(err) = proxy_control_tx.send(ProxyControlMessage::StartConnection(proxy.id)).await { yield Ok(flow.error(&format!( "Failed send message to connect to Edge after setup: {err}" ))); @@ -608,26 +590,24 @@ pub async fn setup_proxy_tls_stream( debug!("Edge control channel not available; skipping connection initiation"); } - debug!("Edge setup completed successfully"); + info!("Edge setup completed successfully"); - { - match Wizard::get(&pool).await { - Ok(wizard) => { + match Wizard::get(&pool).await { + Ok(wizard) => { if !wizard.completed { let state = InitialSetupState { step: InitialSetupStep::Confirmation, }; - if let Err(err) = state.save(&pool).await { - yield Ok(flow.error(&format!("Failed to update setup step in wizard: {err}"))); - return; - } - debug!("Initial setup step advanced to 'Confirmation'"); + if let Err(err) = state.save(&pool).await { + yield Ok(flow.error(&format!("Failed to update setup step in wizard: {err}"))); + return; } + debug!("Initial setup step advanced to 'Confirmation'"); } - Err(err) => { - yield Ok(flow.error(&format!("Failed to fetch wizard state: {err}"))); - return; - } + } + Err(err) => { + yield Ok(flow.error(&format!("Failed to fetch wizard state: {err}"))); + return; } } @@ -635,6 +615,17 @@ pub async fn setup_proxy_tls_stream( yield Ok(flow.step(SetupStep::Done)); }; + let adoption_span = tracing::info_span!("proxy_adoption"); + let stream = async_stream::stream! { + tokio::pin!(inner_stream); + while let Some(item) = scope_setup_logs(log_buffer.clone(), inner_stream.next()) + .instrument(adoption_span.clone()) + .await + { + yield item; + } + }; + Sse::new(stream).keep_alive(KeepAlive::default()) } @@ -649,9 +640,10 @@ pub async fn setup_gateway_tls_stream( Extension(pool): Extension, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); - - let stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx); + let log_buffer = Arc::new(Mutex::new(VecDeque::new())); + let inner_log_buffer = Arc::clone(&log_buffer); + let inner_stream = async_stream::stream! { + let mut flow = SetupFlow::new(log_rx, inner_log_buffer.clone()); // check if tries to add more then 1 gateway to network without enterprise license if !is_enterprise_license_active() { @@ -887,33 +879,36 @@ pub async fn setup_gateway_tls_stream( let mut response = response_with_metadata.into_inner(); - let log_reader_task = tokio::spawn(async move { - while let Some(log_entry) = response.next().await { - match log_entry { - Ok(entry) => { - let level = entry.level - .strip_prefix("Level(") - .and_then(|s| s.strip_suffix(")")) - .unwrap_or(&entry.level) - .to_uppercase(); - - let formatted = format!( - "{} {level} {}: message={}", - entry.timestamp, - entry.target, - entry.message - ); - if log_tx.send(formatted).is_err() { + let spawn_log_buffer = inner_log_buffer.clone(); + let log_reader_task = tokio::spawn( + scope_setup_logs(spawn_log_buffer, async move { + while let Some(log_entry) = response.next().await { + match log_entry { + Ok(entry) => { + let level = entry + .level + .strip_prefix("Level(") + .and_then(|s| s.strip_suffix(")")) + .unwrap_or(&entry.level) + .to_uppercase(); + + let formatted = format!( + "{} {level} {}: message={}", + entry.timestamp, entry.target, entry.message + ); + if log_tx.send(formatted).is_err() { + break; + } + } + Err(e) => { + let _ = log_tx.send(format!("Error reading log: {e}")); break; } } - Err(e) => { - let _ = log_tx.send(format!("Error reading log: {e}")); - break; - } } - } - }); + }) + .instrument(tracing::Span::current()), + ); // Create guard to ensure task is aborted on all exit paths let _log_task_guard = TaskGuard(log_reader_task); @@ -1039,5 +1034,16 @@ pub async fn setup_gateway_tls_stream( yield Ok(flow.step(SetupStep::Done)); }; + let adoption_span = tracing::info_span!("gateway_adoption"); + let stream = async_stream::stream! { + tokio::pin!(inner_stream); + while let Some(item) = scope_setup_logs(log_buffer.clone(), inner_stream.next()) + .instrument(adoption_span.clone()) + .await + { + yield item; + } + }; + Sse::new(stream).keep_alive(KeepAlive::default()) } diff --git a/crates/defguard_core/src/lib.rs b/crates/defguard_core/src/lib.rs index 362366d7d7..0197b5cc13 100644 --- a/crates/defguard_core/src/lib.rs +++ b/crates/defguard_core/src/lib.rs @@ -182,6 +182,7 @@ pub mod grpc; pub mod handlers; pub mod headers; pub mod location_management; +pub mod setup_logs; pub mod support; pub mod updates; pub mod user_management; diff --git a/crates/defguard_core/src/setup_logs.rs b/crates/defguard_core/src/setup_logs.rs new file mode 100644 index 0000000000..45376606ee --- /dev/null +++ b/crates/defguard_core/src/setup_logs.rs @@ -0,0 +1,73 @@ +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, +}; + +use tracing::{Event as TracingEvent, Subscriber}; +use tracing_subscriber::{Layer, layer::Context}; + +pub const MAX_CORE_LOG_LINES: usize = 200; + +tokio::task_local! { + static CORE_SETUP_LOGS: Arc>>; +} + +/// Tracing layer that appends log messages to the active setup log buffer. +#[derive(Clone)] +pub struct CoreSetupLogLayer; + +/// Runs a future with the provided setup log buffer bound to the current task. +pub async fn scope_setup_logs(buffer: Arc>>, future: F) -> T +where + F: std::future::Future, +{ + CORE_SETUP_LOGS.scope(buffer, future).await +} + +impl Layer for CoreSetupLogLayer +where + S: Subscriber, +{ + fn on_event(&self, event: &TracingEvent<'_>, _ctx: Context<'_, S>) { + let Some(buffer) = CORE_SETUP_LOGS.try_with(Clone::clone).ok() else { + return; + }; + + let mut visitor = MessageVisitor::default(); + event.record(&mut visitor); + + let metadata = event.metadata(); + let message = visitor.message.unwrap_or_default(); + let Ok(mut guard) = buffer.lock() else { + return; + }; + if guard.len() >= MAX_CORE_LOG_LINES { + guard.pop_front(); + } + guard.push_back(format!( + "{} {}: {}", + metadata.level(), + metadata.target(), + message + )); + } +} + +#[derive(Default)] +struct MessageVisitor { + message: Option, +} + +impl tracing::field::Visit for MessageVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "message" { + self.message = Some(value.to_owned()); + } + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.message = Some(format!("{value:?}")); + } + } +} diff --git a/crates/defguard_core/tests/integration/api/component_setup.rs b/crates/defguard_core/tests/integration/api/component_setup.rs new file mode 100644 index 0000000000..7a381d0ae7 --- /dev/null +++ b/crates/defguard_core/tests/integration/api/component_setup.rs @@ -0,0 +1,181 @@ +use std::{ + collections::VecDeque, + sync::{Arc, Mutex, Once}, +}; + +use defguard_core::setup_logs::{CoreSetupLogLayer, MAX_CORE_LOG_LINES, scope_setup_logs}; +use reqwest::StatusCode; +use serde_json::Value; +use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; +use tracing::{debug, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use super::common::{make_test_client, setup_pool}; + +fn init_tracing_once() { + static ONCE: Once = Once::new(); + ONCE.call_once(|| { + tracing_subscriber::registry() + .with(CoreSetupLogLayer) + .try_init() + .ok(); + }); +} + +fn parse_sse_data_events(body: &str) -> Vec { + body.lines() + .filter_map(|line| line.strip_prefix("data: ")) + .map(|line| serde_json::from_str::(line).unwrap()) + .collect() +} + +fn read_logs(buffer: &Arc>>) -> Vec { + buffer + .lock() + .expect("test log buffer mutex poisoned") + .iter() + .cloned() + .collect() +} + +async fn log_from_nested_function() { + info!("nested awaited log"); +} + +#[sqlx::test] +async fn test_proxy_setup_error_includes_core_logs(_: PgPoolOptions, options: PgConnectOptions) { + init_tracing_once(); + + let pool = setup_pool(options).await; + + let (mut client, _) = make_test_client(pool).await; + client.login_user("admin", "pass123").await; + + let response = client + .get("/api/v1/proxy/setup/stream?ip_or_domain=bad%20host&grpc_port=50051&common_name=edge") + .send() + .await; + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.text().await; + let events = parse_sse_data_events(&body); + + let error_event = events + .iter() + .find(|event| event.get("error") == Some(&Value::Bool(true))) + .unwrap(); + + let logs = error_event + .get("logs") + .and_then(Value::as_array) + .expect("expected `logs` array in proxy setup error event"); + assert!( + !logs.is_empty(), + "expected Core logs to be present in proxy setup error event" + ); + + let has_core_error = logs.iter().filter_map(Value::as_str).any(|line| { + line.contains("ERROR") && line.contains("defguard_core::handlers::component_setup") + }); + assert!( + has_core_error, + "expected at least one captured Core tracing line in error logs" + ); +} + +#[sqlx::test] +async fn test_gateway_setup_error_includes_core_logs(_: PgPoolOptions, options: PgConnectOptions) { + init_tracing_once(); + + let pool = setup_pool(options).await; + + let (mut client, _) = make_test_client(pool).await; + client.login_user("admin", "pass123").await; + + let response = client + .get("/api/v1/network/1/gateways/setup?ip_or_domain=bad%20host&grpc_port=50051&common_name=gateway") + .send() + .await; + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.text().await; + let events = parse_sse_data_events(&body); + + let error_event = events + .iter() + .find(|event| event.get("error") == Some(&Value::Bool(true))) + .unwrap(); + + let logs = error_event + .get("logs") + .and_then(Value::as_array) + .expect("expected `logs` array in gateway setup error event"); + assert!( + !logs.is_empty(), + "expected Core logs to be present in gateway setup error event" + ); + + let has_core_error = logs.iter().filter_map(Value::as_str).any(|line| { + line.contains("ERROR") && line.contains("defguard_core::handlers::component_setup") + }); + assert!( + has_core_error, + "expected at least one captured Core tracing line in error logs" + ); +} + +#[tokio::test] +async fn scope_setup_logs_captures_logs_inside_scope() { + init_tracing_once(); + + let buffer = Arc::new(Mutex::new(VecDeque::new())); + + scope_setup_logs(Arc::clone(&buffer), async { + info!("captured in setup scope"); + }) + .await; + + let logs = read_logs(&buffer); + assert_eq!(logs.len(), 1); + assert!(logs[0].contains("captured in setup scope")); +} + +#[tokio::test] +async fn nested_awaited_calls_are_captured() { + init_tracing_once(); + + let buffer = Arc::new(Mutex::new(VecDeque::new())); + + scope_setup_logs(Arc::clone(&buffer), async { + log_from_nested_function().await; + }) + .await; + + let logs = read_logs(&buffer); + assert_eq!(logs.len(), 1); + assert!(logs[0].contains("nested awaited log")); +} + +#[tokio::test] +async fn buffer_is_bounded_to_max_core_log_lines() { + init_tracing_once(); + + let buffer = Arc::new(Mutex::new(VecDeque::new())); + + scope_setup_logs(Arc::clone(&buffer), async { + for idx in 0..(MAX_CORE_LOG_LINES + 5) { + debug!("bounded log line {idx}"); + } + }) + .await; + + let logs = read_logs(&buffer); + assert_eq!(logs.len(), MAX_CORE_LOG_LINES); + assert!(logs[0].contains("bounded log line 5")); + assert!( + logs[MAX_CORE_LOG_LINES - 1] + .contains(&format!("bounded log line {}", MAX_CORE_LOG_LINES + 4)) + ); +} diff --git a/crates/defguard_core/tests/integration/api/mod.rs b/crates/defguard_core/tests/integration/api/mod.rs index 8a879d4f81..7989933e61 100644 --- a/crates/defguard_core/tests/integration/api/mod.rs +++ b/crates/defguard_core/tests/integration/api/mod.rs @@ -2,6 +2,7 @@ mod acl; mod api_tokens; mod auth; mod common; +mod component_setup; mod enrollment; mod enterprise_settings; mod forward_auth; diff --git a/tools/defguard_generator/src/vpn_session_stats.rs b/tools/defguard_generator/src/vpn_session_stats.rs index de6b4e093e..ff9519b84e 100644 --- a/tools/defguard_generator/src/vpn_session_stats.rs +++ b/tools/defguard_generator/src/vpn_session_stats.rs @@ -70,7 +70,7 @@ pub async fn generate_vpn_session_stats( let devices = prepare_user_devices(&pool, &mut rng, &user, config.devices_per_user as usize).await?; - let mut used_ips = location.all_used_ips_for_network(&mut *transaction).await?; + let mut used_ips = location.all_used_ips_for_network(&mut transaction).await?; // assign devices to the network if not already assigned for device in &devices { if WireguardNetworkDevice::find(&mut *transaction, device.id, location.id)