From 18e4dd37648a274d5167d5ae6b6cf1cbf849e4cf Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 5 Mar 2026 11:00:59 +0100 Subject: [PATCH 01/18] working setup core logs for proxy adoption --- Cargo.lock | 1 + crates/defguard/src/main.rs | 5 +- crates/defguard_core/Cargo.toml | 1 + crates/defguard_core/src/adoption_logs.rs | 269 ++++++++++++++++++ .../src/handlers/component_setup.rs | 143 ++++++---- crates/defguard_core/src/lib.rs | 1 + .../tests/integration/api/component_setup.rs | 68 +++++ .../tests/integration/api/mod.rs | 1 + 8 files changed, 435 insertions(+), 54 deletions(-) create mode 100644 crates/defguard_core/src/adoption_logs.rs create mode 100644 crates/defguard_core/tests/integration/api/component_setup.rs diff --git a/Cargo.lock b/Cargo.lock index 4a6f696a65..2910b46b11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1429,6 +1429,7 @@ dependencies = [ "tower", "tower-http", "tracing", + "tracing-subscriber", "trait-variant", "uaparser", "utoipa", diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index 5dd2dec097..a5b3b9792a 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -15,6 +15,7 @@ use defguard_common::{ types::proxy::ProxyControlMessage, }; use defguard_core::{ + adoption_logs::core_adoption_log_layer, auth::failed_login::FailedLoginMap, db::AppEvent, enterprise::{ @@ -40,7 +41,7 @@ use tokio::sync::{ broadcast, mpsc::{channel, unbounded_channel}, }; -use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[macro_use] extern crate tracing; @@ -52,7 +53,7 @@ async fn main() -> Result<(), anyhow::Error> { } let mut config = DefGuardConfig::new(); - let subscriber = tracing_subscriber::registry(); + let subscriber = tracing_subscriber::registry().with(core_adoption_log_layer()); defguard_version::tracing::with_version_formatters( &defguard_version::Version::parse(VERSION)?, &config.log_level, diff --git a/crates/defguard_core/Cargo.toml b/crates/defguard_core/Cargo.toml index 0927fce948..6a4c176b61 100644 --- a/crates/defguard_core/Cargo.toml +++ b/crates/defguard_core/Cargo.toml @@ -70,6 +70,7 @@ totp-lite = { workspace = true } tower = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } +tracing-subscriber = { workspace = true } trait-variant = { workspace = true } # openapi utoipa = { workspace = true } diff --git a/crates/defguard_core/src/adoption_logs.rs b/crates/defguard_core/src/adoption_logs.rs new file mode 100644 index 0000000000..53ba840bb1 --- /dev/null +++ b/crates/defguard_core/src/adoption_logs.rs @@ -0,0 +1,269 @@ +use std::{ + collections::{HashMap, VecDeque}, + sync::{Arc, LazyLock, Mutex}, + time::{Duration, Instant}, +}; + +use tracing::{Event, Subscriber}; +use tracing_subscriber::{ + Layer, + layer::Context, + registry::{LookupSpan, SpanRef}, +}; + +const MAX_LOG_LINES_PER_ADOPTION: usize = 200; +const ADOPTION_LOG_TTL: Duration = Duration::from_secs(20 * 60); + +#[derive(Clone, Default)] +pub struct AdoptionLogRegistry { + inner: Arc>>, +} + +#[derive(Default)] +struct AdoptionLogState { + lines: VecDeque, + touched_at: Option, +} + +impl AdoptionLogRegistry { + pub fn start(&self, adoption_id: &str) { + self.with_map(|map| { + self.cleanup_locked(map); + map.entry(adoption_id.to_owned()) + .or_insert_with(AdoptionLogState::default) + .touch(); + }); + } + + pub fn record(&self, adoption_id: &str, line: String) { + self.with_map(|map| { + self.cleanup_locked(map); + let state = map + .entry(adoption_id.to_owned()) + .or_insert_with(AdoptionLogState::default); + state.push_line(line); + }); + } + + pub fn take(&self, adoption_id: &str) -> Vec { + self.with_map(|map| { + self.cleanup_locked(map); + map.remove(adoption_id) + .map(AdoptionLogState::into_vec) + .unwrap_or_default() + }) + } + + fn with_map(&self, f: impl FnOnce(&mut HashMap) -> R) -> R { + let mut guard = self.inner.lock().expect("adoption log mutex poisoned"); + f(&mut guard) + } + + fn cleanup_locked(&self, map: &mut HashMap) { + let now = Instant::now(); + map.retain(|_, state| state.is_fresh(now)); + } +} + +impl AdoptionLogState { + fn touch(&mut self) { + self.touched_at = Some(Instant::now()); + } + + fn push_line(&mut self, line: String) { + self.touch(); + self.lines.push_back(line); + if self.lines.len() > MAX_LOG_LINES_PER_ADOPTION { + self.lines.pop_front(); + } + } + + fn is_fresh(&self, now: Instant) -> bool { + self.touched_at + .map(|touched_at| now.duration_since(touched_at) <= ADOPTION_LOG_TTL) + .unwrap_or(true) + } + + fn into_vec(self) -> Vec { + self.lines.into_iter().collect() + } +} + +#[derive(Clone)] +pub struct CoreAdoptionLogLayer { + registry: AdoptionLogRegistry, +} + +impl CoreAdoptionLogLayer { + #[must_use] + pub fn new(registry: AdoptionLogRegistry) -> Self { + Self { registry } + } +} + +#[derive(Clone)] +struct AdoptionIdExtension(String); + +impl Layer for CoreAdoptionLogLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &tracing::span::Id, + ctx: Context<'_, S>, + ) { + let mut visitor = FieldVisitor::default(); + attrs.record(&mut visitor); + + let Some(adoption_id) = visitor.adoption_id else { + return; + }; + + if let Some(span) = ctx.span(id) { + span.extensions_mut() + .insert(AdoptionIdExtension(adoption_id)); + } + } + + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + let mut visitor = FieldVisitor::default(); + event.record(&mut visitor); + + let adoption_id = visitor + .adoption_id + .or_else(|| find_adoption_id_in_scope(&ctx, event)); + + let Some(adoption_id) = adoption_id else { + return; + }; + + let metadata = event.metadata(); + let message = visitor.message.unwrap_or_default(); + let line = format!("{} {}: {}", metadata.level(), metadata.target(), message); + self.registry.record(&adoption_id, line); + } +} + +fn find_adoption_id_in_scope(ctx: &Context<'_, S>, event: &Event<'_>) -> Option +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + let scope = ctx.event_scope(event)?; + scope.from_root().filter_map(adoption_id_from_span).last() +} + +fn adoption_id_from_span(span: SpanRef<'_, S>) -> Option +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + span.extensions() + .get::() + .map(|id| id.0.clone()) +} + +#[derive(Default)] +struct FieldVisitor { + adoption_id: Option, + message: Option, +} + +impl tracing::field::Visit for FieldVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + match field.name() { + "adoption_id" => { + self.adoption_id = Some(value.to_owned()); + } + "message" => { + self.message = Some(value.to_owned()); + } + _ => {} + } + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + let value = format!("{value:?}"); + self.record_str(field, &value); + } +} + +static REGISTRY: LazyLock = LazyLock::new(AdoptionLogRegistry::default); + +pub fn start_adoption(adoption_id: &str) { + REGISTRY.start(adoption_id); +} + +#[must_use] +pub fn take_logs(adoption_id: &str) -> Vec { + REGISTRY.take(adoption_id) +} + +#[must_use] +pub fn core_adoption_log_layer() -> CoreAdoptionLogLayer { + CoreAdoptionLogLayer::new(REGISTRY.clone()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tracing::info; + use tracing_subscriber::{layer::SubscriberExt, registry::Registry}; + + use super::{AdoptionLogRegistry, CoreAdoptionLogLayer}; + + #[test] + fn captures_only_events_inside_adoption_span() { + let registry = AdoptionLogRegistry::default(); + let layer = CoreAdoptionLogLayer::new(registry.clone()); + let subscriber = Registry::default().with(layer); + let dispatch = tracing::Dispatch::new(subscriber); + + tracing::dispatcher::with_default(&dispatch, || { + let span = tracing::info_span!("proxy_adoption", adoption_id = "adoption-1"); + let _entered = span.enter(); + info!("captured"); + drop(_entered); + + info!("not captured"); + }); + + let logs = registry.take("adoption-1"); + assert_eq!(logs.len(), 1); + assert!(logs[0].contains("captured")); + } + + #[test] + fn separates_logs_for_concurrent_adoptions() { + let registry = AdoptionLogRegistry::default(); + let layer = CoreAdoptionLogLayer::new(registry.clone()); + let subscriber = Registry::default().with(layer); + let dispatch = tracing::Dispatch::new(subscriber); + let dispatch = Arc::new(dispatch); + + let mut handles = Vec::new(); + for (adoption_id, message) in [("a-1", "one"), ("a-2", "two")] { + let dispatch = Arc::clone(&dispatch); + handles.push(std::thread::spawn(move || { + tracing::dispatcher::with_default(&dispatch, || { + let span = tracing::info_span!("proxy_adoption", adoption_id = adoption_id); + let _entered = span.enter(); + info!("{message}"); + }); + })); + } + + for handle in handles { + handle.join().expect("log thread should finish cleanly"); + } + + let logs_a1 = registry.take("a-1"); + let logs_a2 = registry.take("a-2"); + + assert_eq!(logs_a1.len(), 1); + assert_eq!(logs_a2.len(), 1); + assert!(logs_a1[0].contains("one")); + assert!(logs_a2[0].contains("two")); + } +} diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index 7c31e848bf..238251b170 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -37,8 +37,11 @@ use tonic::{ service::Interceptor, transport::{Certificate, ClientTlsConfig, Endpoint}, }; +use tracing::Instrument; +use uuid::Uuid; use crate::{ + adoption_logs, auth::{AdminOrSetupRole, SessionInfo}, enterprise::is_enterprise_license_active, version::{MIN_GATEWAY_VERSION, MIN_PROXY_VERSION}, @@ -152,13 +155,15 @@ fn set_step_message(next_step: SetupStep) -> Event { struct SetupFlow { last_step: SetupStep, + adoption_id: String, log_rx: tokio::sync::mpsc::UnboundedReceiver, } impl SetupFlow { - const fn new(log_rx: tokio::sync::mpsc::UnboundedReceiver) -> Self { + fn new(log_rx: tokio::sync::mpsc::UnboundedReceiver, adoption_id: String) -> Self { Self { last_step: SetupStep::CheckingConfiguration, + adoption_id, log_rx, } } @@ -169,7 +174,13 @@ impl SetupFlow { } fn error(&mut self, message: &str) -> Event { - let mut collected_logs = Vec::new(); + error!( + adoption_id = %self.adoption_id, + step = ?self.last_step, + "{message}" + ); + + let mut collected_logs = adoption_logs::take_logs(&self.adoption_id); while let Ok(log) = self.log_rx.try_recv() { collected_logs.push(log); } @@ -178,6 +189,7 @@ impl SetupFlow { } else { Some(collected_logs) }; + error_message(message, self.last_step, logs) } } @@ -193,9 +205,12 @@ pub async fn setup_proxy_tls_stream( proxy_control_tx: Option>>, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); + let adoption_id = Uuid::new_v4().to_string(); + adoption_logs::start_adoption(&adoption_id); + let adoption_span = tracing::info_span!("proxy_adoption", adoption_id = %adoption_id); - let stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx); + let inner_stream = async_stream::stream! { + let mut flow = SetupFlow::new(log_rx, adoption_id.clone()); // check if tries to connect more then 1 proxy without active enterprise license if !is_enterprise_license_active() { @@ -442,32 +457,35 @@ pub async fn setup_proxy_tls_stream( let mut response = response_with_metadata.into_inner(); - let log_reader_task = tokio::spawn(async move { - while let Some(log_entry) = response.next().await { - match log_entry { - Ok(entry) => { - let level = entry - .level - .strip_prefix("Level(") - .and_then(|s| s.strip_suffix(")")) - .unwrap_or(&entry.level) - .to_uppercase(); - - let formatted = format!( - "{} {} {}: message={}", - entry.timestamp, level, entry.target, entry.message - ); - if log_tx.send(formatted).is_err() { + let log_reader_task = tokio::spawn( + async move { + while let Some(log_entry) = response.next().await { + match log_entry { + Ok(entry) => { + let level = entry + .level + .strip_prefix("Level(") + .and_then(|s| s.strip_suffix(")")) + .unwrap_or(&entry.level) + .to_uppercase(); + + let formatted = format!( + "{} {} {}: message={}", + entry.timestamp, level, entry.target, entry.message + ); + if log_tx.send(formatted).is_err() { + break; + } + } + Err(e) => { + let _ = log_tx.send(format!("Error reading log: {e}")); break; } } - Err(e) => { - let _ = log_tx.send(format!("Error reading log: {e}")); - break; - } } } - }); + .instrument(tracing::Span::current()), + ); // Create guard to ensure task is aborted on all exit paths let _log_task_guard = TaskGuard(log_reader_task); @@ -501,9 +519,7 @@ pub async fn setup_proxy_tls_stream( } }; - debug!( - "Received certificate signing request from Edge for hostname: {hostname}" - ); + debug!("Received certificate signing request from Edge for hostname: {hostname}"); // Step 5: Sign certificate yield Ok(flow.step(SetupStep::SigningCertificate)); @@ -634,6 +650,15 @@ pub async fn setup_proxy_tls_stream( // Step 7: Done yield Ok(flow.step(SetupStep::Done)); }; + let stream = async_stream::stream! { + tokio::pin!(inner_stream); + while let Some(item) = { + let _guard = adoption_span.enter(); + inner_stream.next().await + } { + yield item; + } + }; Sse::new(stream).keep_alive(KeepAlive::default()) } @@ -649,9 +674,12 @@ pub async fn setup_gateway_tls_stream( Extension(pool): Extension, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); + let adoption_id = Uuid::new_v4().to_string(); + adoption_logs::start_adoption(&adoption_id); + let adoption_span = tracing::info_span!("gateway_adoption", adoption_id = %adoption_id); - let stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx); + let inner_stream = async_stream::stream! { + let mut flow = SetupFlow::new(log_rx, adoption_id.clone()); // check if tries to add more then 1 gateway to network without enterprise license if !is_enterprise_license_active() { @@ -887,33 +915,35 @@ pub async fn setup_gateway_tls_stream( let mut response = response_with_metadata.into_inner(); - let log_reader_task = tokio::spawn(async move { - while let Some(log_entry) = response.next().await { - match log_entry { - Ok(entry) => { - let level = entry.level - .strip_prefix("Level(") - .and_then(|s| s.strip_suffix(")")) - .unwrap_or(&entry.level) - .to_uppercase(); - - let formatted = format!( - "{} {level} {}: message={}", - entry.timestamp, - entry.target, - entry.message - ); - if log_tx.send(formatted).is_err() { + let log_reader_task = tokio::spawn( + async move { + while let Some(log_entry) = response.next().await { + match log_entry { + Ok(entry) => { + let level = entry + .level + .strip_prefix("Level(") + .and_then(|s| s.strip_suffix(")")) + .unwrap_or(&entry.level) + .to_uppercase(); + + let formatted = format!( + "{} {level} {}: message={}", + entry.timestamp, entry.target, entry.message + ); + if log_tx.send(formatted).is_err() { + break; + } + } + Err(e) => { + let _ = log_tx.send(format!("Error reading log: {e}")); break; } } - Err(e) => { - let _ = log_tx.send(format!("Error reading log: {e}")); - break; - } } } - }); + .instrument(tracing::Span::current()), + ); // Create guard to ensure task is aborted on all exit paths let _log_task_guard = TaskGuard(log_reader_task); @@ -1038,6 +1068,15 @@ pub async fn setup_gateway_tls_stream( // Step 7: Done yield Ok(flow.step(SetupStep::Done)); }; + let stream = async_stream::stream! { + tokio::pin!(inner_stream); + while let Some(item) = { + let _guard = adoption_span.enter(); + inner_stream.next().await + } { + yield item; + } + }; Sse::new(stream).keep_alive(KeepAlive::default()) } diff --git a/crates/defguard_core/src/lib.rs b/crates/defguard_core/src/lib.rs index 165cc0552c..657baa5613 100644 --- a/crates/defguard_core/src/lib.rs +++ b/crates/defguard_core/src/lib.rs @@ -170,6 +170,7 @@ use crate::{ version::IncompatibleComponents, }; +pub mod adoption_logs; pub mod appstate; pub mod auth; pub mod db; diff --git a/crates/defguard_core/tests/integration/api/component_setup.rs b/crates/defguard_core/tests/integration/api/component_setup.rs new file mode 100644 index 0000000000..b0fac99de1 --- /dev/null +++ b/crates/defguard_core/tests/integration/api/component_setup.rs @@ -0,0 +1,68 @@ +use std::sync::Once; + +use defguard_core::adoption_logs::core_adoption_log_layer; +use reqwest::StatusCode; +use serde_json::Value; +use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use super::common::{make_test_client, setup_pool}; + +fn init_tracing_once() { + static ONCE: Once = Once::new(); + ONCE.call_once(|| { + tracing_subscriber::registry() + .with(core_adoption_log_layer()) + .try_init() + .ok(); + }); +} + +fn parse_sse_data_events(body: &str) -> Vec { + body.lines() + .filter_map(|line| line.strip_prefix("data: ")) + .map(|line| serde_json::from_str::(line).unwrap()) + .collect() +} + +#[sqlx::test] +async fn test_proxy_setup_error_includes_core_logs(_: PgPoolOptions, options: PgConnectOptions) { + init_tracing_once(); + + let pool = setup_pool(options).await; + + let (mut client, _) = make_test_client(pool).await; + client.login_user("admin", "pass123").await; + + let response = client + .get("/api/v1/proxy/setup/stream?ip_or_domain=bad%20host&grpc_port=50051&common_name=edge") + .send() + .await; + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.text().await; + let events = parse_sse_data_events(&body); + + let error_event = events + .iter() + .find(|event| event.get("error") == Some(&Value::Bool(true))) + .unwrap(); + + let logs = error_event + .get("logs") + .and_then(Value::as_array) + .expect("expected `logs` array in proxy setup error event"); + assert!( + !logs.is_empty(), + "expected Core logs to be present in proxy setup error event" + ); + + let has_core_error = logs.iter().filter_map(Value::as_str).any(|line| { + line.contains("ERROR") && line.contains("defguard_core::handlers::component_setup") + }); + assert!( + has_core_error, + "expected at least one captured Core tracing line in error logs" + ); +} diff --git a/crates/defguard_core/tests/integration/api/mod.rs b/crates/defguard_core/tests/integration/api/mod.rs index 8a879d4f81..7989933e61 100644 --- a/crates/defguard_core/tests/integration/api/mod.rs +++ b/crates/defguard_core/tests/integration/api/mod.rs @@ -2,6 +2,7 @@ mod acl; mod api_tokens; mod auth; mod common; +mod component_setup; mod enrollment; mod enterprise_settings; mod forward_auth; From cb80b176e3a692c772bd03ddcc26adce429ec595 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 5 Mar 2026 11:46:56 +0100 Subject: [PATCH 02/18] simplify logs gathering --- crates/defguard/src/main.rs | 4 +- crates/defguard_core/src/adoption_logs.rs | 269 ------------------ .../src/handlers/component_setup.rs | 219 +++++++++++--- crates/defguard_core/src/lib.rs | 1 - .../tests/integration/api/component_setup.rs | 4 +- 5 files changed, 182 insertions(+), 315 deletions(-) delete mode 100644 crates/defguard_core/src/adoption_logs.rs diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index a5b3b9792a..3ac0bd45b5 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -15,7 +15,6 @@ use defguard_common::{ types::proxy::ProxyControlMessage, }; use defguard_core::{ - adoption_logs::core_adoption_log_layer, auth::failed_login::FailedLoginMap, db::AppEvent, enterprise::{ @@ -25,6 +24,7 @@ use defguard_core::{ }, events::{ApiEvent, BidiStreamEvent}, grpc::{GatewayEvent, WorkerState, run_grpc_server}, + handlers::component_setup::core_setup_log_layer, init_dev_env, init_vpn_location, run_web_server, utility_thread::run_utility_thread, version::IncompatibleComponents, @@ -53,7 +53,7 @@ async fn main() -> Result<(), anyhow::Error> { } let mut config = DefGuardConfig::new(); - let subscriber = tracing_subscriber::registry().with(core_adoption_log_layer()); + let subscriber = tracing_subscriber::registry().with(core_setup_log_layer()); defguard_version::tracing::with_version_formatters( &defguard_version::Version::parse(VERSION)?, &config.log_level, diff --git a/crates/defguard_core/src/adoption_logs.rs b/crates/defguard_core/src/adoption_logs.rs deleted file mode 100644 index 53ba840bb1..0000000000 --- a/crates/defguard_core/src/adoption_logs.rs +++ /dev/null @@ -1,269 +0,0 @@ -use std::{ - collections::{HashMap, VecDeque}, - sync::{Arc, LazyLock, Mutex}, - time::{Duration, Instant}, -}; - -use tracing::{Event, Subscriber}; -use tracing_subscriber::{ - Layer, - layer::Context, - registry::{LookupSpan, SpanRef}, -}; - -const MAX_LOG_LINES_PER_ADOPTION: usize = 200; -const ADOPTION_LOG_TTL: Duration = Duration::from_secs(20 * 60); - -#[derive(Clone, Default)] -pub struct AdoptionLogRegistry { - inner: Arc>>, -} - -#[derive(Default)] -struct AdoptionLogState { - lines: VecDeque, - touched_at: Option, -} - -impl AdoptionLogRegistry { - pub fn start(&self, adoption_id: &str) { - self.with_map(|map| { - self.cleanup_locked(map); - map.entry(adoption_id.to_owned()) - .or_insert_with(AdoptionLogState::default) - .touch(); - }); - } - - pub fn record(&self, adoption_id: &str, line: String) { - self.with_map(|map| { - self.cleanup_locked(map); - let state = map - .entry(adoption_id.to_owned()) - .or_insert_with(AdoptionLogState::default); - state.push_line(line); - }); - } - - pub fn take(&self, adoption_id: &str) -> Vec { - self.with_map(|map| { - self.cleanup_locked(map); - map.remove(adoption_id) - .map(AdoptionLogState::into_vec) - .unwrap_or_default() - }) - } - - fn with_map(&self, f: impl FnOnce(&mut HashMap) -> R) -> R { - let mut guard = self.inner.lock().expect("adoption log mutex poisoned"); - f(&mut guard) - } - - fn cleanup_locked(&self, map: &mut HashMap) { - let now = Instant::now(); - map.retain(|_, state| state.is_fresh(now)); - } -} - -impl AdoptionLogState { - fn touch(&mut self) { - self.touched_at = Some(Instant::now()); - } - - fn push_line(&mut self, line: String) { - self.touch(); - self.lines.push_back(line); - if self.lines.len() > MAX_LOG_LINES_PER_ADOPTION { - self.lines.pop_front(); - } - } - - fn is_fresh(&self, now: Instant) -> bool { - self.touched_at - .map(|touched_at| now.duration_since(touched_at) <= ADOPTION_LOG_TTL) - .unwrap_or(true) - } - - fn into_vec(self) -> Vec { - self.lines.into_iter().collect() - } -} - -#[derive(Clone)] -pub struct CoreAdoptionLogLayer { - registry: AdoptionLogRegistry, -} - -impl CoreAdoptionLogLayer { - #[must_use] - pub fn new(registry: AdoptionLogRegistry) -> Self { - Self { registry } - } -} - -#[derive(Clone)] -struct AdoptionIdExtension(String); - -impl Layer for CoreAdoptionLogLayer -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - fn on_new_span( - &self, - attrs: &tracing::span::Attributes<'_>, - id: &tracing::span::Id, - ctx: Context<'_, S>, - ) { - let mut visitor = FieldVisitor::default(); - attrs.record(&mut visitor); - - let Some(adoption_id) = visitor.adoption_id else { - return; - }; - - if let Some(span) = ctx.span(id) { - span.extensions_mut() - .insert(AdoptionIdExtension(adoption_id)); - } - } - - fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { - let mut visitor = FieldVisitor::default(); - event.record(&mut visitor); - - let adoption_id = visitor - .adoption_id - .or_else(|| find_adoption_id_in_scope(&ctx, event)); - - let Some(adoption_id) = adoption_id else { - return; - }; - - let metadata = event.metadata(); - let message = visitor.message.unwrap_or_default(); - let line = format!("{} {}: {}", metadata.level(), metadata.target(), message); - self.registry.record(&adoption_id, line); - } -} - -fn find_adoption_id_in_scope(ctx: &Context<'_, S>, event: &Event<'_>) -> Option -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - let scope = ctx.event_scope(event)?; - scope.from_root().filter_map(adoption_id_from_span).last() -} - -fn adoption_id_from_span(span: SpanRef<'_, S>) -> Option -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - span.extensions() - .get::() - .map(|id| id.0.clone()) -} - -#[derive(Default)] -struct FieldVisitor { - adoption_id: Option, - message: Option, -} - -impl tracing::field::Visit for FieldVisitor { - fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - match field.name() { - "adoption_id" => { - self.adoption_id = Some(value.to_owned()); - } - "message" => { - self.message = Some(value.to_owned()); - } - _ => {} - } - } - - fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - let value = format!("{value:?}"); - self.record_str(field, &value); - } -} - -static REGISTRY: LazyLock = LazyLock::new(AdoptionLogRegistry::default); - -pub fn start_adoption(adoption_id: &str) { - REGISTRY.start(adoption_id); -} - -#[must_use] -pub fn take_logs(adoption_id: &str) -> Vec { - REGISTRY.take(adoption_id) -} - -#[must_use] -pub fn core_adoption_log_layer() -> CoreAdoptionLogLayer { - CoreAdoptionLogLayer::new(REGISTRY.clone()) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use tracing::info; - use tracing_subscriber::{layer::SubscriberExt, registry::Registry}; - - use super::{AdoptionLogRegistry, CoreAdoptionLogLayer}; - - #[test] - fn captures_only_events_inside_adoption_span() { - let registry = AdoptionLogRegistry::default(); - let layer = CoreAdoptionLogLayer::new(registry.clone()); - let subscriber = Registry::default().with(layer); - let dispatch = tracing::Dispatch::new(subscriber); - - tracing::dispatcher::with_default(&dispatch, || { - let span = tracing::info_span!("proxy_adoption", adoption_id = "adoption-1"); - let _entered = span.enter(); - info!("captured"); - drop(_entered); - - info!("not captured"); - }); - - let logs = registry.take("adoption-1"); - assert_eq!(logs.len(), 1); - assert!(logs[0].contains("captured")); - } - - #[test] - fn separates_logs_for_concurrent_adoptions() { - let registry = AdoptionLogRegistry::default(); - let layer = CoreAdoptionLogLayer::new(registry.clone()); - let subscriber = Registry::default().with(layer); - let dispatch = tracing::Dispatch::new(subscriber); - let dispatch = Arc::new(dispatch); - - let mut handles = Vec::new(); - for (adoption_id, message) in [("a-1", "one"), ("a-2", "two")] { - let dispatch = Arc::clone(&dispatch); - handles.push(std::thread::spawn(move || { - tracing::dispatcher::with_default(&dispatch, || { - let span = tracing::info_span!("proxy_adoption", adoption_id = adoption_id); - let _entered = span.enter(); - info!("{message}"); - }); - })); - } - - for handle in handles { - handle.join().expect("log thread should finish cleanly"); - } - - let logs_a1 = registry.take("a-1"); - let logs_a2 = registry.take("a-2"); - - assert_eq!(logs_a1.len(), 1); - assert_eq!(logs_a2.len(), 1); - assert!(logs_a1[0].contains("one")); - assert!(logs_a2[0].contains("two")); - } -} diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index 238251b170..17f31bdfc4 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -1,4 +1,11 @@ -use std::{convert::Infallible, time::Duration}; +use std::{ + cell::RefCell, + convert::Infallible, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context as TaskContext, Poll}, + time::Duration, +}; use axum::{ Extension, @@ -38,10 +45,14 @@ use tonic::{ transport::{Certificate, ClientTlsConfig, Endpoint}, }; use tracing::Instrument; -use uuid::Uuid; +use tracing::{Event as TracingEvent, Subscriber}; +use tracing_subscriber::{ + Layer, + layer::Context, + registry::{LookupSpan, SpanRef}, +}; use crate::{ - adoption_logs, auth::{AdminOrSetupRole, SessionInfo}, enterprise::is_enterprise_license_active, version::{MIN_GATEWAY_VERSION, MIN_PROXY_VERSION}, @@ -49,6 +60,143 @@ use crate::{ const TOKEN_CLIENT_ID: &str = "Defguard Core"; const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); +const MAX_CORE_LOG_LINES: usize = 200; + +#[derive(Clone)] +struct CoreLogBuffer(Arc>>); + +#[derive(Clone)] +pub struct CoreSetupLogLayer; + +thread_local! { + static ACTIVE_CORE_LOG_BUFFER: RefCell>>>> = const { RefCell::new(None) }; +} + +#[must_use] +pub fn core_setup_log_layer() -> CoreSetupLogLayer { + CoreSetupLogLayer +} + +struct SpanStream { + inner: Pin>, + span: tracing::Span, +} + +impl SpanStream { + fn new(inner: S, span: tracing::Span) -> Self { + Self { + inner: Box::pin(inner), + span, + } + } +} + +impl Stream for SpanStream +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + let span = self.span.clone(); + let _entered = span.enter(); + self.inner.as_mut().poll_next(cx) + } +} + +impl Layer for CoreSetupLogLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &tracing::span::Id, + ctx: Context<'_, S>, + ) { + if attrs.metadata().name() != "proxy_adoption" { + return; + } + + let Some(buffer) = ACTIVE_CORE_LOG_BUFFER.with(|active| active.borrow().clone()) else { + return; + }; + + if let Some(span) = ctx.span(id) { + span.extensions_mut().insert(CoreLogBuffer(buffer)); + } + } + + fn on_event(&self, event: &TracingEvent<'_>, ctx: Context<'_, S>) { + let Some(buffer) = find_log_buffer_in_scope(&ctx, event) else { + return; + }; + + let mut visitor = MessageVisitor::default(); + event.record(&mut visitor); + + let metadata = event.metadata(); + let message = visitor.message.unwrap_or_default(); + let mut guard = buffer.lock().expect("core log buffer mutex poisoned"); + if guard.len() >= MAX_CORE_LOG_LINES { + guard.remove(0); + } + guard.push(format!( + "{} {}: {}", + metadata.level(), + metadata.target(), + message + )); + } +} + +fn with_active_core_log_buffer(buffer: Arc>>, f: impl FnOnce() -> R) -> R { + ACTIVE_CORE_LOG_BUFFER.with(|active| { + let previous = active.replace(Some(buffer)); + let result = f(); + active.replace(previous); + result + }) +} + +fn find_log_buffer_in_scope( + ctx: &Context<'_, S>, + event: &TracingEvent<'_>, +) -> Option>>> +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + let scope = ctx.event_scope(event)?; + scope.from_root().filter_map(log_buffer_from_span).last() +} + +fn log_buffer_from_span(span: SpanRef<'_, S>) -> Option>>> +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + span.extensions() + .get::() + .map(|buffer| buffer.0.clone()) +} + +#[derive(Default)] +struct MessageVisitor { + message: Option, +} + +impl tracing::field::Visit for MessageVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "message" { + self.message = Some(value.to_owned()); + } + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.message = Some(format!("{value:?}")); + } + } +} /// Guard that aborts a tokio task when dropped struct TaskGuard(tokio::task::JoinHandle<()>); @@ -155,15 +303,18 @@ fn set_step_message(next_step: SetupStep) -> Event { struct SetupFlow { last_step: SetupStep, - adoption_id: String, + core_log_buffer: Option>>>, log_rx: tokio::sync::mpsc::UnboundedReceiver, } impl SetupFlow { - fn new(log_rx: tokio::sync::mpsc::UnboundedReceiver, adoption_id: String) -> Self { + fn new( + log_rx: tokio::sync::mpsc::UnboundedReceiver, + core_log_buffer: Option>>>, + ) -> Self { Self { last_step: SetupStep::CheckingConfiguration, - adoption_id, + core_log_buffer, log_rx, } } @@ -174,13 +325,16 @@ impl SetupFlow { } fn error(&mut self, message: &str) -> Event { - error!( - adoption_id = %self.adoption_id, - step = ?self.last_step, - "{message}" - ); - - let mut collected_logs = adoption_logs::take_logs(&self.adoption_id); + error!(step = ?self.last_step, "{message}"); + + let mut collected_logs = self + .core_log_buffer + .as_ref() + .map(|buffer| { + let mut guard = buffer.lock().expect("core log buffer mutex poisoned"); + std::mem::take(&mut *guard) + }) + .unwrap_or_default(); while let Ok(log) = self.log_rx.try_recv() { collected_logs.push(log); } @@ -205,12 +359,13 @@ pub async fn setup_proxy_tls_stream( proxy_control_tx: Option>>, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); - let adoption_id = Uuid::new_v4().to_string(); - adoption_logs::start_adoption(&adoption_id); - let adoption_span = tracing::info_span!("proxy_adoption", adoption_id = %adoption_id); + let core_log_buffer = Arc::new(Mutex::new(Vec::new())); + let adoption_span = with_active_core_log_buffer(core_log_buffer.clone(), || { + tracing::info_span!("proxy_adoption") + }); let inner_stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx, adoption_id.clone()); + let mut flow = SetupFlow::new(log_rx, Some(core_log_buffer.clone())); // check if tries to connect more then 1 proxy without active enterprise license if !is_enterprise_license_active() { @@ -364,6 +519,9 @@ pub async fn setup_proxy_tls_stream( request.ip_or_domain, request.grpc_port ); + info!("Info test"); + warn!("warn test"); + error!("error test"); let response_with_metadata = match tokio::time::timeout(CONNECTION_TIMEOUT, client.start(())).await { Ok(Ok(r)) => r, @@ -650,15 +808,7 @@ pub async fn setup_proxy_tls_stream( // Step 7: Done yield Ok(flow.step(SetupStep::Done)); }; - let stream = async_stream::stream! { - tokio::pin!(inner_stream); - while let Some(item) = { - let _guard = adoption_span.enter(); - inner_stream.next().await - } { - yield item; - } - }; + let stream = SpanStream::new(inner_stream, adoption_span); Sse::new(stream).keep_alive(KeepAlive::default()) } @@ -674,12 +824,8 @@ pub async fn setup_gateway_tls_stream( Extension(pool): Extension, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); - let adoption_id = Uuid::new_v4().to_string(); - adoption_logs::start_adoption(&adoption_id); - let adoption_span = tracing::info_span!("gateway_adoption", adoption_id = %adoption_id); - - let inner_stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx, adoption_id.clone()); + let stream = async_stream::stream! { + let mut flow = SetupFlow::new(log_rx, None); // check if tries to add more then 1 gateway to network without enterprise license if !is_enterprise_license_active() { @@ -1068,15 +1214,6 @@ pub async fn setup_gateway_tls_stream( // Step 7: Done yield Ok(flow.step(SetupStep::Done)); }; - let stream = async_stream::stream! { - tokio::pin!(inner_stream); - while let Some(item) = { - let _guard = adoption_span.enter(); - inner_stream.next().await - } { - yield item; - } - }; Sse::new(stream).keep_alive(KeepAlive::default()) } diff --git a/crates/defguard_core/src/lib.rs b/crates/defguard_core/src/lib.rs index 657baa5613..165cc0552c 100644 --- a/crates/defguard_core/src/lib.rs +++ b/crates/defguard_core/src/lib.rs @@ -170,7 +170,6 @@ use crate::{ version::IncompatibleComponents, }; -pub mod adoption_logs; pub mod appstate; pub mod auth; pub mod db; diff --git a/crates/defguard_core/tests/integration/api/component_setup.rs b/crates/defguard_core/tests/integration/api/component_setup.rs index b0fac99de1..022e9aa8ef 100644 --- a/crates/defguard_core/tests/integration/api/component_setup.rs +++ b/crates/defguard_core/tests/integration/api/component_setup.rs @@ -1,6 +1,6 @@ use std::sync::Once; -use defguard_core::adoption_logs::core_adoption_log_layer; +use defguard_core::handlers::component_setup::core_setup_log_layer; use reqwest::StatusCode; use serde_json::Value; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; @@ -12,7 +12,7 @@ fn init_tracing_once() { static ONCE: Once = Once::new(); ONCE.call_once(|| { tracing_subscriber::registry() - .with(core_adoption_log_layer()) + .with(core_setup_log_layer()) .try_init() .ok(); }); From 21a2772c49d094b9c150c1e1808e6b1e7860ea9f Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Fri, 6 Mar 2026 14:12:07 +0100 Subject: [PATCH 03/18] task-local log buffer --- .../src/handlers/component_setup.rs | 348 ++++++------------ 1 file changed, 107 insertions(+), 241 deletions(-) diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index dc168eb7bc..c701f3097c 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -1,9 +1,6 @@ use std::{ - cell::RefCell, convert::Infallible, - pin::Pin, sync::{Arc, Mutex}, - task::{Context as TaskContext, Poll}, time::Duration, }; @@ -46,11 +43,7 @@ use tonic::{ }; use tracing::Instrument; use tracing::{Event as TracingEvent, Subscriber}; -use tracing_subscriber::{ - Layer, - layer::Context, - registry::{LookupSpan, SpanRef}, -}; +use tracing_subscriber::{Layer, layer::Context}; use crate::{ auth::{AdminOrSetupRole, SessionInfo}, @@ -62,14 +55,11 @@ const TOKEN_CLIENT_ID: &str = "Defguard Core"; const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); const MAX_CORE_LOG_LINES: usize = 200; -#[derive(Clone)] -struct CoreLogBuffer(Arc>>); - #[derive(Clone)] pub struct CoreSetupLogLayer; -thread_local! { - static ACTIVE_CORE_LOG_BUFFER: RefCell>>>> = const { RefCell::new(None) }; +tokio::task_local! { + static CORE_SETUP_LOGS: Arc>>; } #[must_use] @@ -77,58 +67,12 @@ pub fn core_setup_log_layer() -> CoreSetupLogLayer { CoreSetupLogLayer } -struct SpanStream { - inner: Pin>, - span: tracing::Span, -} - -impl SpanStream { - fn new(inner: S, span: tracing::Span) -> Self { - Self { - inner: Box::pin(inner), - span, - } - } -} - -impl Stream for SpanStream -where - S: Stream, -{ - type Item = S::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { - let span = self.span.clone(); - let _entered = span.enter(); - self.inner.as_mut().poll_next(cx) - } -} - impl Layer for CoreSetupLogLayer where - S: Subscriber + for<'a> LookupSpan<'a>, + S: Subscriber, { - fn on_new_span( - &self, - attrs: &tracing::span::Attributes<'_>, - id: &tracing::span::Id, - ctx: Context<'_, S>, - ) { - if attrs.metadata().name() != "proxy_adoption" { - return; - } - - let Some(buffer) = ACTIVE_CORE_LOG_BUFFER.with(|active| active.borrow().clone()) else { - return; - }; - - if let Some(span) = ctx.span(id) { - span.extensions_mut().insert(CoreLogBuffer(buffer)); - } - } - - fn on_event(&self, event: &TracingEvent<'_>, ctx: Context<'_, S>) { - let Some(buffer) = find_log_buffer_in_scope(&ctx, event) else { + fn on_event(&self, event: &TracingEvent<'_>, _ctx: Context<'_, S>) { + let Some(buffer) = CORE_SETUP_LOGS.try_with(Clone::clone).ok() else { return; }; @@ -150,35 +94,6 @@ where } } -fn with_active_core_log_buffer(buffer: Arc>>, f: impl FnOnce() -> R) -> R { - ACTIVE_CORE_LOG_BUFFER.with(|active| { - let previous = active.replace(Some(buffer)); - let result = f(); - active.replace(previous); - result - }) -} - -fn find_log_buffer_in_scope( - ctx: &Context<'_, S>, - event: &TracingEvent<'_>, -) -> Option>>> -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - let scope = ctx.event_scope(event)?; - scope.from_root().filter_map(log_buffer_from_span).last() -} - -fn log_buffer_from_span(span: SpanRef<'_, S>) -> Option>>> -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - span.extensions() - .get::() - .map(|buffer| buffer.0.clone()) -} - #[derive(Default)] struct MessageVisitor { message: Option, @@ -360,21 +275,16 @@ pub async fn setup_proxy_tls_stream( ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); let core_log_buffer = Arc::new(Mutex::new(Vec::new())); - let adoption_span = with_active_core_log_buffer(core_log_buffer.clone(), || { - tracing::info_span!("proxy_adoption") - }); - + let inner_core_log_buffer = Arc::clone(&core_log_buffer); let inner_stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx, Some(core_log_buffer.clone())); + let mut flow = SetupFlow::new(log_rx, Some(inner_core_log_buffer.clone())); - // check if tries to connect more then 1 proxy without active enterprise license if !is_enterprise_license_active() { match Proxy::list(&pool).await { Ok(current_proxies) => { if !current_proxies.is_empty() { yield Ok(flow.error( - "Enterprise license is required for connecting more \ - then one Edge.", + "Enterprise license is required for connecting more then one Edge.", )); return; } @@ -385,21 +295,13 @@ pub async fn setup_proxy_tls_stream( } } } + info!("License check passed"); - // Step 1: Check configuration yield Ok(flow.step(SetupStep::CheckingConfiguration)); - - match Proxy::find_by_address_port( - &pool, - &request.ip_or_domain, - i32::from(request.grpc_port), - ) - .await - { + match Proxy::find_by_address_port(&pool, &request.ip_or_domain, i32::from(request.grpc_port)).await { Ok(Some(proxy)) => { yield Ok(flow.error(&format!( - "An edge Proxy with address {}:{} is already \ - registered with name \"{}\".", + "An edge Proxy with address {}:{} is already registered with name \"{}\".", request.ip_or_domain, request.grpc_port, proxy.name ))); return; @@ -417,7 +319,6 @@ pub async fn setup_proxy_tls_stream( } let url_str = format!("http://{}:{}", request.ip_or_domain, request.grpc_port); - let url = match Url::parse(&url_str) { Ok(u) => u, Err(e) => { @@ -426,7 +327,7 @@ pub async fn setup_proxy_tls_stream( } }; - debug!("Successfully validated Edge address: {url_str}",); + debug!("Successfully validated Edge address: {url_str}"); let endpoint = match Endpoint::from_shared(url_str) { Ok(e) => e, @@ -481,11 +382,9 @@ pub async fn setup_proxy_tls_stream( } }; - // Step 2: Check availability yield Ok(flow.step(SetupStep::CheckingAvailability)); let version_clone = version.clone(); - let token = match Claims::new( defguard_common::auth::claims::ClaimsType::Gateway, url.to_string(), @@ -505,65 +404,51 @@ pub async fn setup_proxy_tls_stream( let version_interceptor = ClientVersionInterceptor::new(version); let auth_interceptor = AuthInterceptor::new(token); - - let mut client = ProxySetupClient::with_interceptor( - endpoint.connect_lazy(), - move |mut req: Request<()>| { - req = version_interceptor.clone().call(req)?; - auth_interceptor.clone().call(req) - }, - ); + let mut client = ProxySetupClient::with_interceptor(endpoint.connect_lazy(), move |mut req: Request<()>| { + req = version_interceptor.clone().call(req)?; + auth_interceptor.clone().call(req) + }); debug!( "Initiating connection to Edge at {}:{}", request.ip_or_domain, request.grpc_port ); - info!("Info test"); - warn!("warn test"); - error!("error test"); - let response_with_metadata = - match tokio::time::timeout(CONNECTION_TIMEOUT, client.start(())).await { - Ok(Ok(r)) => r, - Ok(Err(e)) => { - match e.code() { - tonic::Code::Unavailable => { - let error_msg = e.to_string(); - if error_msg.contains("h2 protocol error") - || error_msg.contains("http2 error") - { - yield Ok(flow.error(&format!( - "Failed to connect to Edge at {}:{}: {}. This may indicate that \ - the Edge is already configured with TLS. Please check if the Edge \ - has already been set up.", - request.ip_or_domain, request.grpc_port, e - ))); - } else { - yield Ok(flow.error(&format!( - "Failed to connect to Edge at {}:{}. Please ensure the address \ - and port are correct and that the Edge component is running.", - request.ip_or_domain, request.grpc_port - ))); - } - } - _ => { - yield Ok(flow.error(&format!("Failed to connect to Edge: {e}"))); + let response_with_metadata = match tokio::time::timeout(CONNECTION_TIMEOUT, client.start(())).await { + Ok(Ok(r)) => r, + Ok(Err(e)) => { + match e.code() { + tonic::Code::Unavailable => { + let error_msg = e.to_string(); + if error_msg.contains("h2 protocol error") || error_msg.contains("http2 error") { + yield Ok(flow.error(&format!( + "Failed to connect to Edge at {}:{}: {}. This may indicate that the Edge is already configured with TLS. Please check if the Edge has already been set up.", + request.ip_or_domain, request.grpc_port, e + ))); + } else { + yield Ok(flow.error(&format!( + "Failed to connect to Edge at {}:{}. Please ensure the address and port are correct and that the Edge component is running.", + request.ip_or_domain, request.grpc_port + ))); } } - return; - } - Err(_) => { - yield Ok(flow.error(&format!( - "Connection to Edge at {}:{} timed out after 10 seconds.", - request.ip_or_domain, request.grpc_port - ))); - return; + _ => { + yield Ok(flow.error(&format!("Failed to connect to Edge: {e}"))); + } } - }; + return; + } + Err(_) => { + yield Ok(flow.error(&format!( + "Connection to Edge at {}:{} timed out after 10 seconds.", + request.ip_or_domain, request.grpc_port + ))); + return; + } + }; debug!("Successfully connected to Edge"); - // Step 3: Check version yield Ok(flow.step(SetupStep::CheckingVersion)); let proxy_version = response_with_metadata @@ -580,8 +465,7 @@ pub async fn setup_proxy_tls_stream( if let Some(proxy_version) = proxy_version { if proxy_version < MIN_PROXY_VERSION { yield Ok(flow.error(&format!( - "Edge version {proxy_version} is older than Core version \ - {version_clone}. Please update the Edge component.", + "Edge version {proxy_version} is older than Core version {version_clone}. Please update the Edge component.", ))); return; } @@ -600,9 +484,7 @@ pub async fn setup_proxy_tls_stream( }; match serde_json::to_string(&response) { - Ok(body) => { - yield Ok(Event::default().data(body)); - } + Ok(body) => yield Ok(Event::default().data(body)), Err(e) => { yield Ok(flow.error(&format!("Failed to serialize version response: {e}"))); return; @@ -614,41 +496,40 @@ pub async fn setup_proxy_tls_stream( } let mut response = response_with_metadata.into_inner(); - + let spawn_log_buffer = inner_core_log_buffer.clone(); let log_reader_task = tokio::spawn( - async move { - while let Some(log_entry) = response.next().await { - match log_entry { - Ok(entry) => { - let level = entry - .level - .strip_prefix("Level(") - .and_then(|s| s.strip_suffix(")")) - .unwrap_or(&entry.level) - .to_uppercase(); - - let formatted = format!( - "{} {} {}: message={}", - entry.timestamp, level, entry.target, entry.message - ); - if log_tx.send(formatted).is_err() { + CORE_SETUP_LOGS + .scope(spawn_log_buffer, async move { + while let Some(log_entry) = response.next().await { + match log_entry { + Ok(entry) => { + let level = entry + .level + .strip_prefix("Level(") + .and_then(|s| s.strip_suffix(")")) + .unwrap_or(&entry.level) + .to_uppercase(); + + let formatted = format!( + "{} {} {}: message={}", + entry.timestamp, level, entry.target, entry.message + ); + if log_tx.send(formatted).is_err() { + break; + } + } + Err(e) => { + let _ = log_tx.send(format!("Error reading log: {e}")); break; } } - Err(e) => { - let _ = log_tx.send(format!("Error reading log: {e}")); - break; - } } - } - } - .instrument(tracing::Span::current()), + }) + .instrument(tracing::Span::current()), ); - // Create guard to ensure task is aborted on all exit paths let _log_task_guard = TaskGuard(log_reader_task); - // Step 4: Obtain CSR yield Ok(flow.step(SetupStep::ObtainingCsr)); let Some(hostname) = url.host_str() else { @@ -656,12 +537,7 @@ pub async fn setup_proxy_tls_stream( return; }; - let csr_response = match client - .get_csr(CertificateInfo { - cert_hostname: hostname.to_string(), - }) - .await - { + let csr_response = match client.get_csr(CertificateInfo { cert_hostname: hostname.to_string() }).await { Ok(r) => r.into_inner(), Err(e) => { yield Ok(flow.error(&format!("Failed to obtain CSR: {e}"))); @@ -679,25 +555,19 @@ pub async fn setup_proxy_tls_stream( debug!("Received certificate signing request from Edge for hostname: {hostname}"); - // Step 5: Sign certificate yield Ok(flow.step(SetupStep::SigningCertificate)); let settings = Settings::get_current_settings(); - let Some(ca_cert_der) = settings.ca_cert_der else { yield Ok(flow.error("CA certificate not found in settings")); return; }; - let Some(ca_key_pair) = settings.ca_key_der else { yield Ok(flow.error("CA key pair not found in settings")); return; }; - let ca = match defguard_certs::CertificateAuthority::from_cert_der_key_pair( - &ca_cert_der, - &ca_key_pair, - ) { + let ca = match defguard_certs::CertificateAuthority::from_cert_der_key_pair(&ca_cert_der, &ca_key_pair) { Ok(c) => c, Err(e) => { yield Ok(flow.error(&format!("Failed to create CA: {e}"))); @@ -717,31 +587,23 @@ pub async fn setup_proxy_tls_stream( debug!("Successfully signed certificate for Edge"); - // Step 6: Configure TLS yield Ok(flow.step(SetupStep::ConfiguringTls)); - let response = DerPayload { - der_data: cert.der().to_vec(), - }; - - if let Err(e) = client.send_cert(response).await { + if let Err(e) = client.send_cert(DerPayload { der_data: cert.der().to_vec() }).await { yield Ok(flow.error(&format!("Failed to send certificate: {e}"))); return; } debug!("Certificate successfully delivered to Edge"); - let defguard_certs::CertificateInfo { - not_after: expiry, - serial, - .. - } = match defguard_certs::CertificateInfo::from_der(cert.der()) { - Ok(dt) => dt, - Err(err) => { - yield Ok(flow.error(&format!("Failed to get certificate expiry: {err}"))); - return; - } - }; + let defguard_certs::CertificateInfo { not_after: expiry, serial, .. } = + match defguard_certs::CertificateInfo::from_der(cert.der()) { + Ok(dt) => dt, + Err(err) => { + yield Ok(flow.error(&format!("Failed to get certificate expiry: {err}"))); + return; + } + }; debug!("Certificate expiry date determined: {expiry}"); @@ -751,7 +613,6 @@ pub async fn setup_proxy_tls_stream( i32::from(request.grpc_port), &session.user.fullname(), ); - proxy.certificate = Some(serial); proxy.certificate_expiry = Some(expiry); @@ -768,11 +629,9 @@ pub async fn setup_proxy_tls_stream( request.common_name, proxy.id ); debug!("Establishing connection to newly configured Edge"); + if let Some(proxy_control_tx) = proxy_control_tx { - if let Err(err) = proxy_control_tx - .send(ProxyControlMessage::StartConnection(proxy.id)) - .await - { + if let Err(err) = proxy_control_tx.send(ProxyControlMessage::StartConnection(proxy.id)).await { yield Ok(flow.error(&format!( "Failed send message to connect to Edge after setup: {err}" ))); @@ -784,31 +643,38 @@ pub async fn setup_proxy_tls_stream( debug!("Edge setup completed successfully"); - { - match Wizard::get(&pool).await { - Ok(wizard) => { + match Wizard::get(&pool).await { + Ok(wizard) => { if !wizard.completed { let state = InitialSetupState { step: InitialSetupStep::Confirmation, }; - if let Err(err) = state.save(&pool).await { - yield Ok(flow.error(&format!("Failed to update setup step in wizard: {err}"))); - return; - } - debug!("Initial setup step advanced to 'Confirmation'"); + if let Err(err) = state.save(&pool).await { + yield Ok(flow.error(&format!("Failed to update setup step in wizard: {err}"))); + return; } - } - Err(err) => { - yield Ok(flow.error(&format!("Failed to fetch wizard state: {err}"))); - return; + debug!("Initial setup step advanced to 'Confirmation'"); } } + Err(err) => { + yield Ok(flow.error(&format!("Failed to fetch wizard state: {err}"))); + return; + } } - // Step 7: Done yield Ok(flow.step(SetupStep::Done)); }; - let stream = SpanStream::new(inner_stream, adoption_span); + + let stream = async_stream::stream! { + tokio::pin!(inner_stream); + while let Some(item) = CORE_SETUP_LOGS + .scope(core_log_buffer.clone(), inner_stream.next()) + .instrument(tracing::info_span!("proxy_adoption")) + .await + { + yield item; + } + }; Sse::new(stream).keep_alive(KeepAlive::default()) } From 59f58e9aaea69d1e7ce188326769021afdceaa48 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Fri, 6 Mar 2026 16:54:07 +0100 Subject: [PATCH 04/18] separate module from setup tracing --- crates/defguard/src/main.rs | 2 +- .../src/handlers/component_setup.rs | 68 +----------------- crates/defguard_core/src/lib.rs | 1 + crates/defguard_core/src/setup_logs.rs | 71 +++++++++++++++++++ .../tests/integration/api/component_setup.rs | 2 +- 5 files changed, 77 insertions(+), 67 deletions(-) create mode 100644 crates/defguard_core/src/setup_logs.rs diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index eca1efb71b..e57e2c4097 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -24,8 +24,8 @@ use defguard_core::{ }, events::{ApiEvent, BidiStreamEvent}, grpc::{GatewayEvent, WorkerState, run_grpc_server}, - handlers::component_setup::core_setup_log_layer, init_dev_env, init_vpn_location, run_web_server, + setup_logs::core_setup_log_layer, utility_thread::run_utility_thread, version::IncompatibleComponents, }; diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index c701f3097c..2bcf441c30 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -42,76 +42,16 @@ use tonic::{ transport::{Certificate, ClientTlsConfig, Endpoint}, }; use tracing::Instrument; -use tracing::{Event as TracingEvent, Subscriber}; -use tracing_subscriber::{Layer, layer::Context}; use crate::{ auth::{AdminOrSetupRole, SessionInfo}, enterprise::is_enterprise_license_active, + setup_logs::scope_setup_logs, version::{MIN_GATEWAY_VERSION, MIN_PROXY_VERSION}, }; const TOKEN_CLIENT_ID: &str = "Defguard Core"; const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); -const MAX_CORE_LOG_LINES: usize = 200; - -#[derive(Clone)] -pub struct CoreSetupLogLayer; - -tokio::task_local! { - static CORE_SETUP_LOGS: Arc>>; -} - -#[must_use] -pub fn core_setup_log_layer() -> CoreSetupLogLayer { - CoreSetupLogLayer -} - -impl Layer for CoreSetupLogLayer -where - S: Subscriber, -{ - fn on_event(&self, event: &TracingEvent<'_>, _ctx: Context<'_, S>) { - let Some(buffer) = CORE_SETUP_LOGS.try_with(Clone::clone).ok() else { - return; - }; - - let mut visitor = MessageVisitor::default(); - event.record(&mut visitor); - - let metadata = event.metadata(); - let message = visitor.message.unwrap_or_default(); - let mut guard = buffer.lock().expect("core log buffer mutex poisoned"); - if guard.len() >= MAX_CORE_LOG_LINES { - guard.remove(0); - } - guard.push(format!( - "{} {}: {}", - metadata.level(), - metadata.target(), - message - )); - } -} - -#[derive(Default)] -struct MessageVisitor { - message: Option, -} - -impl tracing::field::Visit for MessageVisitor { - fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - if field.name() == "message" { - self.message = Some(value.to_owned()); - } - } - - fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - if field.name() == "message" { - self.message = Some(format!("{value:?}")); - } - } -} /// Guard that aborts a tokio task when dropped struct TaskGuard(tokio::task::JoinHandle<()>); @@ -498,8 +438,7 @@ pub async fn setup_proxy_tls_stream( let mut response = response_with_metadata.into_inner(); let spawn_log_buffer = inner_core_log_buffer.clone(); let log_reader_task = tokio::spawn( - CORE_SETUP_LOGS - .scope(spawn_log_buffer, async move { + scope_setup_logs(spawn_log_buffer, async move { while let Some(log_entry) = response.next().await { match log_entry { Ok(entry) => { @@ -667,8 +606,7 @@ pub async fn setup_proxy_tls_stream( let stream = async_stream::stream! { tokio::pin!(inner_stream); - while let Some(item) = CORE_SETUP_LOGS - .scope(core_log_buffer.clone(), inner_stream.next()) + while let Some(item) = scope_setup_logs(core_log_buffer.clone(), inner_stream.next()) .instrument(tracing::info_span!("proxy_adoption")) .await { diff --git a/crates/defguard_core/src/lib.rs b/crates/defguard_core/src/lib.rs index 362366d7d7..0197b5cc13 100644 --- a/crates/defguard_core/src/lib.rs +++ b/crates/defguard_core/src/lib.rs @@ -182,6 +182,7 @@ pub mod grpc; pub mod handlers; pub mod headers; pub mod location_management; +pub mod setup_logs; pub mod support; pub mod updates; pub mod user_management; diff --git a/crates/defguard_core/src/setup_logs.rs b/crates/defguard_core/src/setup_logs.rs new file mode 100644 index 0000000000..5168ef817e --- /dev/null +++ b/crates/defguard_core/src/setup_logs.rs @@ -0,0 +1,71 @@ +use std::sync::{Arc, Mutex}; + +use tracing::{Event as TracingEvent, Subscriber}; +use tracing_subscriber::{Layer, layer::Context}; + +const MAX_CORE_LOG_LINES: usize = 200; + +#[derive(Clone)] +pub struct CoreSetupLogLayer; + +tokio::task_local! { + static CORE_SETUP_LOGS: Arc>>; +} + +#[must_use] +pub fn core_setup_log_layer() -> CoreSetupLogLayer { + CoreSetupLogLayer +} + +pub async fn scope_setup_logs(buffer: Arc>>, future: F) -> T +where + F: std::future::Future, +{ + CORE_SETUP_LOGS.scope(buffer, future).await +} + +impl Layer for CoreSetupLogLayer +where + S: Subscriber, +{ + fn on_event(&self, event: &TracingEvent<'_>, _ctx: Context<'_, S>) { + let Some(buffer) = CORE_SETUP_LOGS.try_with(Clone::clone).ok() else { + return; + }; + + let mut visitor = MessageVisitor::default(); + event.record(&mut visitor); + + let metadata = event.metadata(); + let message = visitor.message.unwrap_or_default(); + let mut guard = buffer.lock().expect("core log buffer mutex poisoned"); + if guard.len() >= MAX_CORE_LOG_LINES { + guard.remove(0); + } + guard.push(format!( + "{} {}: {}", + metadata.level(), + metadata.target(), + message + )); + } +} + +#[derive(Default)] +struct MessageVisitor { + message: Option, +} + +impl tracing::field::Visit for MessageVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "message" { + self.message = Some(value.to_owned()); + } + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.message = Some(format!("{value:?}")); + } + } +} diff --git a/crates/defguard_core/tests/integration/api/component_setup.rs b/crates/defguard_core/tests/integration/api/component_setup.rs index 022e9aa8ef..6e567c6e54 100644 --- a/crates/defguard_core/tests/integration/api/component_setup.rs +++ b/crates/defguard_core/tests/integration/api/component_setup.rs @@ -1,6 +1,6 @@ use std::sync::Once; -use defguard_core::handlers::component_setup::core_setup_log_layer; +use defguard_core::setup_logs::core_setup_log_layer; use reqwest::StatusCode; use serde_json::Value; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; From e823227cbfd12750e85146d0ffb80be7f2a6a897 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 11:02:25 +0100 Subject: [PATCH 05/18] non-optional log buffer --- .../src/handlers/component_setup.rs | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index 2bcf441c30..58b8e6783b 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -158,18 +158,18 @@ fn set_step_message(next_step: SetupStep) -> Event { struct SetupFlow { last_step: SetupStep, - core_log_buffer: Option>>>, + log_buffer: Arc>>, log_rx: tokio::sync::mpsc::UnboundedReceiver, } impl SetupFlow { fn new( log_rx: tokio::sync::mpsc::UnboundedReceiver, - core_log_buffer: Option>>>, + log_buffer: Arc>>, ) -> Self { Self { last_step: SetupStep::CheckingConfiguration, - core_log_buffer, + log_buffer, log_rx, } } @@ -182,14 +182,13 @@ impl SetupFlow { fn error(&mut self, message: &str) -> Event { error!(step = ?self.last_step, "{message}"); - let mut collected_logs = self - .core_log_buffer - .as_ref() - .map(|buffer| { - let mut guard = buffer.lock().expect("core log buffer mutex poisoned"); - std::mem::take(&mut *guard) - }) - .unwrap_or_default(); + let mut collected_logs = { + let mut guard = self + .log_buffer + .lock() + .expect("core log buffer mutex poisoned"); + std::mem::take(&mut *guard) + }; while let Ok(log) = self.log_rx.try_recv() { collected_logs.push(log); } @@ -214,10 +213,10 @@ pub async fn setup_proxy_tls_stream( proxy_control_tx: Option>>, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); - let core_log_buffer = Arc::new(Mutex::new(Vec::new())); - let inner_core_log_buffer = Arc::clone(&core_log_buffer); + let log_buffer = Arc::new(Mutex::new(Vec::new())); + let inner_log_buffer = Arc::clone(&log_buffer); let inner_stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx, Some(inner_core_log_buffer.clone())); + let mut flow = SetupFlow::new(log_rx, inner_log_buffer.clone()); if !is_enterprise_license_active() { match Proxy::list(&pool).await { @@ -436,7 +435,7 @@ pub async fn setup_proxy_tls_stream( } let mut response = response_with_metadata.into_inner(); - let spawn_log_buffer = inner_core_log_buffer.clone(); + let spawn_log_buffer = inner_log_buffer.clone(); let log_reader_task = tokio::spawn( scope_setup_logs(spawn_log_buffer, async move { while let Some(log_entry) = response.next().await { @@ -606,7 +605,7 @@ pub async fn setup_proxy_tls_stream( let stream = async_stream::stream! { tokio::pin!(inner_stream); - while let Some(item) = scope_setup_logs(core_log_buffer.clone(), inner_stream.next()) + while let Some(item) = scope_setup_logs(log_buffer.clone(), inner_stream.next()) .instrument(tracing::info_span!("proxy_adoption")) .await { @@ -628,8 +627,9 @@ pub async fn setup_gateway_tls_stream( Extension(pool): Extension, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); + let log_buffer = Arc::new(Mutex::new(Vec::new())); let stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx, None); + let mut flow = SetupFlow::new(log_rx, log_buffer); // check if tries to add more then 1 gateway to network without enterprise license if !is_enterprise_license_active() { From 1b65027ad237bd7fdb505e9dba35a57ae5573476 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 11:08:13 +0100 Subject: [PATCH 06/18] simplify log line --- crates/defguard_core/src/handlers/component_setup.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index 58b8e6783b..ec05354425 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -180,13 +180,13 @@ impl SetupFlow { } fn error(&mut self, message: &str) -> Event { - error!(step = ?self.last_step, "{message}"); + error!("{message}"); let mut collected_logs = { let mut guard = self .log_buffer .lock() - .expect("core log buffer mutex poisoned"); + .expect("log buffer mutex poisoned"); std::mem::take(&mut *guard) }; while let Ok(log) = self.log_rx.try_recv() { From 26fc5d46934f085b64b1841379097b4db688a41b Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 11:23:58 +0100 Subject: [PATCH 07/18] comments & logging tweaks --- crates/defguard_core/src/handlers/component_setup.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index ec05354425..591110164c 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -218,12 +218,13 @@ pub async fn setup_proxy_tls_stream( let inner_stream = async_stream::stream! { let mut flow = SetupFlow::new(log_rx, inner_log_buffer.clone()); + // check if tries to connect more then 1 proxy without active enterprise license if !is_enterprise_license_active() { match Proxy::list(&pool).await { Ok(current_proxies) => { if !current_proxies.is_empty() { yield Ok(flow.error( - "Enterprise license is required for connecting more then one Edge.", + "Enterprise license is required for connecting more than one Edge.", )); return; } @@ -234,7 +235,8 @@ pub async fn setup_proxy_tls_stream( } } } - info!("License check passed"); + + debug!("License check passed"); yield Ok(flow.step(SetupStep::CheckingConfiguration)); match Proxy::find_by_address_port(&pool, &request.ip_or_domain, i32::from(request.grpc_port)).await { @@ -257,6 +259,8 @@ pub async fn setup_proxy_tls_stream( } } + debug!("License check passed"); + let url_str = format!("http://{}:{}", request.ip_or_domain, request.grpc_port); let url = match Url::parse(&url_str) { Ok(u) => u, @@ -579,7 +583,7 @@ pub async fn setup_proxy_tls_stream( debug!("Edge control channel not available; skipping connection initiation"); } - debug!("Edge setup completed successfully"); + info!("Edge setup completed successfully"); match Wizard::get(&pool).await { Ok(wizard) => { From 8ab3ed3fe87e5bcc1b621e3f968a432cb4a4ce20 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 12:29:46 +0100 Subject: [PATCH 08/18] always log component_setup endpoint with debug level --- crates/defguard/src/main.rs | 6 +++++- crates/defguard_core/src/handlers/component_setup.rs | 5 +---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index e57e2c4097..ab18e5eb8d 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -55,11 +55,15 @@ async fn main() -> Result<(), anyhow::Error> { dotenvy::dotenv().ok(); } let mut config = DefGuardConfig::new(); + let log_filter = format!( + "{},defguard_core::handlers::component_setup=debug", + config.log_level + ); let subscriber = tracing_subscriber::registry().with(core_setup_log_layer()); defguard_version::tracing::with_version_formatters( &defguard_version::Version::parse(VERSION)?, - &config.log_level, + &log_filter, subscriber, ) .init(); diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index 591110164c..84bc35be5b 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -183,10 +183,7 @@ impl SetupFlow { error!("{message}"); let mut collected_logs = { - let mut guard = self - .log_buffer - .lock() - .expect("log buffer mutex poisoned"); + let mut guard = self.log_buffer.lock().expect("log buffer mutex poisoned"); std::mem::take(&mut *guard) }; while let Ok(log) = self.log_rx.try_recv() { From b42cad515e8f7c5651b17834eb314913d6bc6777 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 12:53:03 +0100 Subject: [PATCH 09/18] adoption logs for gateway --- .../src/handlers/component_setup.rs | 20 +++++++-- .../tests/integration/api/component_setup.rs | 42 +++++++++++++++++++ .../GatewaySetupPage/GatewaySetupPage.tsx | 14 ++++++- 3 files changed, 71 insertions(+), 5 deletions(-) diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index 84bc35be5b..3fbc18738f 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -629,8 +629,9 @@ pub async fn setup_gateway_tls_stream( ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); let log_buffer = Arc::new(Mutex::new(Vec::new())); - let stream = async_stream::stream! { - let mut flow = SetupFlow::new(log_rx, log_buffer); + let inner_log_buffer = Arc::clone(&log_buffer); + let inner_stream = async_stream::stream! { + let mut flow = SetupFlow::new(log_rx, inner_log_buffer.clone()); // check if tries to add more then 1 gateway to network without enterprise license if !is_enterprise_license_active() { @@ -866,8 +867,9 @@ pub async fn setup_gateway_tls_stream( let mut response = response_with_metadata.into_inner(); + let spawn_log_buffer = inner_log_buffer.clone(); let log_reader_task = tokio::spawn( - async move { + scope_setup_logs(spawn_log_buffer, async move { while let Some(log_entry) = response.next().await { match log_entry { Ok(entry) => { @@ -892,7 +894,7 @@ pub async fn setup_gateway_tls_stream( } } } - } + }) .instrument(tracing::Span::current()), ); @@ -1020,5 +1022,15 @@ pub async fn setup_gateway_tls_stream( yield Ok(flow.step(SetupStep::Done)); }; + let stream = async_stream::stream! { + tokio::pin!(inner_stream); + while let Some(item) = scope_setup_logs(log_buffer.clone(), inner_stream.next()) + .instrument(tracing::info_span!("gateway_adoption")) + .await + { + yield item; + } + }; + Sse::new(stream).keep_alive(KeepAlive::default()) } diff --git a/crates/defguard_core/tests/integration/api/component_setup.rs b/crates/defguard_core/tests/integration/api/component_setup.rs index 6e567c6e54..09dcf7fe21 100644 --- a/crates/defguard_core/tests/integration/api/component_setup.rs +++ b/crates/defguard_core/tests/integration/api/component_setup.rs @@ -66,3 +66,45 @@ async fn test_proxy_setup_error_includes_core_logs(_: PgPoolOptions, options: Pg "expected at least one captured Core tracing line in error logs" ); } + +#[sqlx::test] +async fn test_gateway_setup_error_includes_core_logs(_: PgPoolOptions, options: PgConnectOptions) { + init_tracing_once(); + + let pool = setup_pool(options).await; + + let (mut client, _) = make_test_client(pool).await; + client.login_user("admin", "pass123").await; + + let response = client + .get("/api/v1/network/1/gateways/setup?ip_or_domain=bad%20host&grpc_port=50051&common_name=gateway") + .send() + .await; + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.text().await; + let events = parse_sse_data_events(&body); + + let error_event = events + .iter() + .find(|event| event.get("error") == Some(&Value::Bool(true))) + .unwrap(); + + let logs = error_event + .get("logs") + .and_then(Value::as_array) + .expect("expected `logs` array in gateway setup error event"); + assert!( + !logs.is_empty(), + "expected Core logs to be present in gateway setup error event" + ); + + let has_core_error = logs.iter().filter_map(Value::as_str).any(|line| { + line.contains("ERROR") && line.contains("defguard_core::handlers::component_setup") + }); + assert!( + has_core_error, + "expected at least one captured Core tracing line in error logs" + ); +} diff --git a/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx b/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx index f19acfe340..ee38e25259 100644 --- a/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx +++ b/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx @@ -1,6 +1,6 @@ import './style.scss'; import { useNavigate } from '@tanstack/react-router'; -import { type ReactNode, useMemo } from 'react'; +import { type ReactNode, useEffect, useMemo } from 'react'; import { m } from '../../paraglide/messages'; import { Controls } from '../../shared/components/Controls/Controls'; import type { WizardPageStep } from '../../shared/components/wizard/types'; @@ -19,9 +19,19 @@ import { useGatewayWizardStore } from './useGatewayWizardStore'; export const GatewaySetupPage = () => { const activeStep = useGatewayWizardStore((s) => s.activeStep); const isOnWelcomePage = useGatewayWizardStore((s) => s.isOnWelcomePage); + const networkId = useGatewayWizardStore((s) => s.network_id); const setIsOnWelcomePage = useGatewayWizardStore((s) => s.setisOnWelcomePage); const navigate = useNavigate(); + useEffect(() => { + if (networkId !== null) { + return; + } + + useGatewayWizardStore.getState().reset(); + void navigate({ to: '/locations', replace: true }); + }, [navigate, networkId]); + const stepsConfig = useMemo( (): Record => ({ deployGateway: { @@ -77,6 +87,7 @@ export const GatewaySetupPage = () => { ); return ( + networkId === null ? null : ( { @@ -109,5 +120,6 @@ export const GatewaySetupPage = () => { > {stepsComponents[activeStep]} + ) ); }; From 02790d0c0e9690892cf33f263e38aa756a62f011 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 12:59:33 +0100 Subject: [PATCH 10/18] fix clippy issue --- tools/defguard_generator/src/vpn_session_stats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/defguard_generator/src/vpn_session_stats.rs b/tools/defguard_generator/src/vpn_session_stats.rs index de6b4e093e..ff9519b84e 100644 --- a/tools/defguard_generator/src/vpn_session_stats.rs +++ b/tools/defguard_generator/src/vpn_session_stats.rs @@ -70,7 +70,7 @@ pub async fn generate_vpn_session_stats( let devices = prepare_user_devices(&pool, &mut rng, &user, config.devices_per_user as usize).await?; - let mut used_ips = location.all_used_ips_for_network(&mut *transaction).await?; + let mut used_ips = location.all_used_ips_for_network(&mut transaction).await?; // assign devices to the network if not already assigned for device in &devices { if WireguardNetworkDevice::find(&mut *transaction, device.id, location.id) From 2d7b280e2daa39c964bdc116ff36c0cf2f89e773 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 13:02:45 +0100 Subject: [PATCH 11/18] fix log message --- crates/defguard_core/src/handlers/component_setup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index 3fbc18738f..c595f9efb0 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -256,7 +256,7 @@ pub async fn setup_proxy_tls_stream( } } - debug!("License check passed"); + debug!("Configuration check passed"); let url_str = format!("http://{}:{}", request.ip_or_domain, request.grpc_port); let url = match Url::parse(&url_str) { From dd899a37f204dd87c236524d80407554ffce1b92 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 13:11:30 +0100 Subject: [PATCH 12/18] step comments --- crates/defguard_core/src/handlers/component_setup.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index c595f9efb0..d22d9018fd 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -235,6 +235,7 @@ pub async fn setup_proxy_tls_stream( debug!("License check passed"); + // Step 1: Check configuration yield Ok(flow.step(SetupStep::CheckingConfiguration)); match Proxy::find_by_address_port(&pool, &request.ip_or_domain, i32::from(request.grpc_port)).await { Ok(Some(proxy)) => { @@ -322,6 +323,7 @@ pub async fn setup_proxy_tls_stream( } }; + // Step 2: Check availability yield Ok(flow.step(SetupStep::CheckingAvailability)); let version_clone = version.clone(); @@ -389,6 +391,7 @@ pub async fn setup_proxy_tls_stream( debug!("Successfully connected to Edge"); + // Step 3: Check version yield Ok(flow.step(SetupStep::CheckingVersion)); let proxy_version = response_with_metadata @@ -469,6 +472,7 @@ pub async fn setup_proxy_tls_stream( let _log_task_guard = TaskGuard(log_reader_task); + // Step 4: Obtain CSR yield Ok(flow.step(SetupStep::ObtainingCsr)); let Some(hostname) = url.host_str() else { @@ -494,6 +498,7 @@ pub async fn setup_proxy_tls_stream( debug!("Received certificate signing request from Edge for hostname: {hostname}"); + // Step 5: Sign certificate yield Ok(flow.step(SetupStep::SigningCertificate)); let settings = Settings::get_current_settings(); @@ -526,6 +531,7 @@ pub async fn setup_proxy_tls_stream( debug!("Successfully signed certificate for Edge"); + // Step 6: Configure TLS yield Ok(flow.step(SetupStep::ConfiguringTls)); if let Err(e) = client.send_cert(DerPayload { der_data: cert.der().to_vec() }).await { @@ -601,6 +607,7 @@ pub async fn setup_proxy_tls_stream( } } + // Step 7: Done yield Ok(flow.step(SetupStep::Done)); }; From 11e8a02130b6cef7924af4e6ab333f59e6e0bd22 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 13:34:46 +0100 Subject: [PATCH 13/18] pnpm fix --- web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx b/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx index ee38e25259..5831091f8d 100644 --- a/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx +++ b/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx @@ -86,8 +86,7 @@ export const GatewaySetupPage = () => { ); - return ( - networkId === null ? null : ( + return networkId === null ? null : ( { @@ -120,6 +119,5 @@ export const GatewaySetupPage = () => { > {stepsComponents[activeStep]} - ) ); }; From 0a177ae066dabbe67ee59e602cf236d4f6490c5b Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 14:14:20 +0100 Subject: [PATCH 14/18] remove unneccesary constructor --- crates/defguard/src/main.rs | 4 ++-- crates/defguard_core/src/setup_logs.rs | 5 ----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/crates/defguard/src/main.rs b/crates/defguard/src/main.rs index ab18e5eb8d..8b25fe36d8 100644 --- a/crates/defguard/src/main.rs +++ b/crates/defguard/src/main.rs @@ -25,7 +25,7 @@ use defguard_core::{ events::{ApiEvent, BidiStreamEvent}, grpc::{GatewayEvent, WorkerState, run_grpc_server}, init_dev_env, init_vpn_location, run_web_server, - setup_logs::core_setup_log_layer, + setup_logs::CoreSetupLogLayer, utility_thread::run_utility_thread, version::IncompatibleComponents, }; @@ -60,7 +60,7 @@ async fn main() -> Result<(), anyhow::Error> { config.log_level ); - let subscriber = tracing_subscriber::registry().with(core_setup_log_layer()); + let subscriber = tracing_subscriber::registry().with(CoreSetupLogLayer); defguard_version::tracing::with_version_formatters( &defguard_version::Version::parse(VERSION)?, &log_filter, diff --git a/crates/defguard_core/src/setup_logs.rs b/crates/defguard_core/src/setup_logs.rs index 5168ef817e..6e950ddf85 100644 --- a/crates/defguard_core/src/setup_logs.rs +++ b/crates/defguard_core/src/setup_logs.rs @@ -12,11 +12,6 @@ tokio::task_local! { static CORE_SETUP_LOGS: Arc>>; } -#[must_use] -pub fn core_setup_log_layer() -> CoreSetupLogLayer { - CoreSetupLogLayer -} - pub async fn scope_setup_logs(buffer: Arc>>, future: F) -> T where F: std::future::Future, From aedf1e8d4c4ce082979778c5f7998e0cce258586 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 14:25:15 +0100 Subject: [PATCH 15/18] add tests --- crates/defguard_core/src/setup_logs.rs | 2 +- .../tests/integration/api/component_setup.rs | 71 ++++++++++++++++++- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/crates/defguard_core/src/setup_logs.rs b/crates/defguard_core/src/setup_logs.rs index 6e950ddf85..e6526e69cb 100644 --- a/crates/defguard_core/src/setup_logs.rs +++ b/crates/defguard_core/src/setup_logs.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex}; use tracing::{Event as TracingEvent, Subscriber}; use tracing_subscriber::{Layer, layer::Context}; -const MAX_CORE_LOG_LINES: usize = 200; +pub const MAX_CORE_LOG_LINES: usize = 200; #[derive(Clone)] pub struct CoreSetupLogLayer; diff --git a/crates/defguard_core/tests/integration/api/component_setup.rs b/crates/defguard_core/tests/integration/api/component_setup.rs index 09dcf7fe21..4355567068 100644 --- a/crates/defguard_core/tests/integration/api/component_setup.rs +++ b/crates/defguard_core/tests/integration/api/component_setup.rs @@ -1,9 +1,11 @@ use std::sync::Once; -use defguard_core::setup_logs::core_setup_log_layer; +use defguard_core::setup_logs::{CoreSetupLogLayer, MAX_CORE_LOG_LINES, scope_setup_logs}; use reqwest::StatusCode; use serde_json::Value; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; +use std::sync::{Arc, Mutex}; +use tracing::{debug, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use super::common::{make_test_client, setup_pool}; @@ -12,7 +14,7 @@ fn init_tracing_once() { static ONCE: Once = Once::new(); ONCE.call_once(|| { tracing_subscriber::registry() - .with(core_setup_log_layer()) + .with(CoreSetupLogLayer) .try_init() .ok(); }); @@ -25,6 +27,17 @@ fn parse_sse_data_events(body: &str) -> Vec { .collect() } +fn read_logs(buffer: &Arc>>) -> Vec { + buffer + .lock() + .expect("test log buffer mutex poisoned") + .clone() +} + +async fn log_from_nested_function() { + info!("nested awaited log"); +} + #[sqlx::test] async fn test_proxy_setup_error_includes_core_logs(_: PgPoolOptions, options: PgConnectOptions) { init_tracing_once(); @@ -108,3 +121,57 @@ async fn test_gateway_setup_error_includes_core_logs(_: PgPoolOptions, options: "expected at least one captured Core tracing line in error logs" ); } + +#[tokio::test] +async fn scope_setup_logs_captures_logs_inside_scope() { + init_tracing_once(); + + let buffer = Arc::new(Mutex::new(Vec::new())); + + scope_setup_logs(Arc::clone(&buffer), async { + info!("captured in setup scope"); + }) + .await; + + let logs = read_logs(&buffer); + assert_eq!(logs.len(), 1); + assert!(logs[0].contains("captured in setup scope")); +} + +#[tokio::test] +async fn nested_awaited_calls_are_captured() { + init_tracing_once(); + + let buffer = Arc::new(Mutex::new(Vec::new())); + + scope_setup_logs(Arc::clone(&buffer), async { + log_from_nested_function().await; + }) + .await; + + let logs = read_logs(&buffer); + assert_eq!(logs.len(), 1); + assert!(logs[0].contains("nested awaited log")); +} + +#[tokio::test] +async fn buffer_is_bounded_to_max_core_log_lines() { + init_tracing_once(); + + let buffer = Arc::new(Mutex::new(Vec::new())); + + scope_setup_logs(Arc::clone(&buffer), async { + for idx in 0..(MAX_CORE_LOG_LINES + 5) { + debug!("bounded log line {idx}"); + } + }) + .await; + + let logs = read_logs(&buffer); + assert_eq!(logs.len(), MAX_CORE_LOG_LINES); + assert!(logs[0].contains("bounded log line 5")); + assert!( + logs[MAX_CORE_LOG_LINES - 1] + .contains(&format!("bounded log line {}", MAX_CORE_LOG_LINES + 4)) + ); +} From 6a66ccdb97ff9f6d32aa1b310edea9b46f7c378f Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 18:27:09 +0100 Subject: [PATCH 16/18] remove location null-check - solved in dev branch --- .../pages/GatewaySetupPage/GatewaySetupPage.tsx | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx b/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx index 5831091f8d..f19acfe340 100644 --- a/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx +++ b/web/src/pages/GatewaySetupPage/GatewaySetupPage.tsx @@ -1,6 +1,6 @@ import './style.scss'; import { useNavigate } from '@tanstack/react-router'; -import { type ReactNode, useEffect, useMemo } from 'react'; +import { type ReactNode, useMemo } from 'react'; import { m } from '../../paraglide/messages'; import { Controls } from '../../shared/components/Controls/Controls'; import type { WizardPageStep } from '../../shared/components/wizard/types'; @@ -19,19 +19,9 @@ import { useGatewayWizardStore } from './useGatewayWizardStore'; export const GatewaySetupPage = () => { const activeStep = useGatewayWizardStore((s) => s.activeStep); const isOnWelcomePage = useGatewayWizardStore((s) => s.isOnWelcomePage); - const networkId = useGatewayWizardStore((s) => s.network_id); const setIsOnWelcomePage = useGatewayWizardStore((s) => s.setisOnWelcomePage); const navigate = useNavigate(); - useEffect(() => { - if (networkId !== null) { - return; - } - - useGatewayWizardStore.getState().reset(); - void navigate({ to: '/locations', replace: true }); - }, [navigate, networkId]); - const stepsConfig = useMemo( (): Record => ({ deployGateway: { @@ -86,7 +76,7 @@ export const GatewaySetupPage = () => { ); - return networkId === null ? null : ( + return ( { From e34e8f6ac474197a966ad2101f39c8c0ef742305 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 18:56:29 +0100 Subject: [PATCH 17/18] use VecDeque for log buffer instead of Vec --- .../src/handlers/component_setup.rs | 22 ++++++++++++------- crates/defguard_core/src/setup_logs.rs | 17 +++++++++----- .../tests/integration/api/component_setup.rs | 18 +++++++++------ 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/crates/defguard_core/src/handlers/component_setup.rs b/crates/defguard_core/src/handlers/component_setup.rs index d22d9018fd..378b962ce2 100644 --- a/crates/defguard_core/src/handlers/component_setup.rs +++ b/crates/defguard_core/src/handlers/component_setup.rs @@ -1,4 +1,5 @@ use std::{ + collections::VecDeque, convert::Infallible, sync::{Arc, Mutex}, time::Duration, @@ -158,14 +159,14 @@ fn set_step_message(next_step: SetupStep) -> Event { struct SetupFlow { last_step: SetupStep, - log_buffer: Arc>>, + log_buffer: Arc>>, log_rx: tokio::sync::mpsc::UnboundedReceiver, } impl SetupFlow { fn new( log_rx: tokio::sync::mpsc::UnboundedReceiver, - log_buffer: Arc>>, + log_buffer: Arc>>, ) -> Self { Self { last_step: SetupStep::CheckingConfiguration, @@ -183,8 +184,11 @@ impl SetupFlow { error!("{message}"); let mut collected_logs = { - let mut guard = self.log_buffer.lock().expect("log buffer mutex poisoned"); - std::mem::take(&mut *guard) + let mut guard = self + .log_buffer + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + std::mem::take(&mut *guard).into_iter().collect::>() }; while let Ok(log) = self.log_rx.try_recv() { collected_logs.push(log); @@ -210,7 +214,7 @@ pub async fn setup_proxy_tls_stream( proxy_control_tx: Option>>, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); - let log_buffer = Arc::new(Mutex::new(Vec::new())); + let log_buffer = Arc::new(Mutex::new(VecDeque::new())); let inner_log_buffer = Arc::clone(&log_buffer); let inner_stream = async_stream::stream! { let mut flow = SetupFlow::new(log_rx, inner_log_buffer.clone()); @@ -611,10 +615,11 @@ pub async fn setup_proxy_tls_stream( yield Ok(flow.step(SetupStep::Done)); }; + let adoption_span = tracing::info_span!("proxy_adoption"); let stream = async_stream::stream! { tokio::pin!(inner_stream); while let Some(item) = scope_setup_logs(log_buffer.clone(), inner_stream.next()) - .instrument(tracing::info_span!("proxy_adoption")) + .instrument(adoption_span.clone()) .await { yield item; @@ -635,7 +640,7 @@ pub async fn setup_gateway_tls_stream( Extension(pool): Extension, ) -> Sse>> { let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); - let log_buffer = Arc::new(Mutex::new(Vec::new())); + let log_buffer = Arc::new(Mutex::new(VecDeque::new())); let inner_log_buffer = Arc::clone(&log_buffer); let inner_stream = async_stream::stream! { let mut flow = SetupFlow::new(log_rx, inner_log_buffer.clone()); @@ -1029,10 +1034,11 @@ pub async fn setup_gateway_tls_stream( yield Ok(flow.step(SetupStep::Done)); }; + let adoption_span = tracing::info_span!("gateway_adoption"); let stream = async_stream::stream! { tokio::pin!(inner_stream); while let Some(item) = scope_setup_logs(log_buffer.clone(), inner_stream.next()) - .instrument(tracing::info_span!("gateway_adoption")) + .instrument(adoption_span.clone()) .await { yield item; diff --git a/crates/defguard_core/src/setup_logs.rs b/crates/defguard_core/src/setup_logs.rs index e6526e69cb..9b10df24b4 100644 --- a/crates/defguard_core/src/setup_logs.rs +++ b/crates/defguard_core/src/setup_logs.rs @@ -1,4 +1,7 @@ -use std::sync::{Arc, Mutex}; +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, +}; use tracing::{Event as TracingEvent, Subscriber}; use tracing_subscriber::{Layer, layer::Context}; @@ -9,10 +12,10 @@ pub const MAX_CORE_LOG_LINES: usize = 200; pub struct CoreSetupLogLayer; tokio::task_local! { - static CORE_SETUP_LOGS: Arc>>; + static CORE_SETUP_LOGS: Arc>>; } -pub async fn scope_setup_logs(buffer: Arc>>, future: F) -> T +pub async fn scope_setup_logs(buffer: Arc>>, future: F) -> T where F: std::future::Future, { @@ -33,11 +36,13 @@ where let metadata = event.metadata(); let message = visitor.message.unwrap_or_default(); - let mut guard = buffer.lock().expect("core log buffer mutex poisoned"); + let Ok(mut guard) = buffer.lock() else { + return; + }; if guard.len() >= MAX_CORE_LOG_LINES { - guard.remove(0); + guard.pop_front(); } - guard.push(format!( + guard.push_back(format!( "{} {}: {}", metadata.level(), metadata.target(), diff --git a/crates/defguard_core/tests/integration/api/component_setup.rs b/crates/defguard_core/tests/integration/api/component_setup.rs index 4355567068..7a381d0ae7 100644 --- a/crates/defguard_core/tests/integration/api/component_setup.rs +++ b/crates/defguard_core/tests/integration/api/component_setup.rs @@ -1,10 +1,12 @@ -use std::sync::Once; +use std::{ + collections::VecDeque, + sync::{Arc, Mutex, Once}, +}; use defguard_core::setup_logs::{CoreSetupLogLayer, MAX_CORE_LOG_LINES, scope_setup_logs}; use reqwest::StatusCode; use serde_json::Value; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; -use std::sync::{Arc, Mutex}; use tracing::{debug, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -27,11 +29,13 @@ fn parse_sse_data_events(body: &str) -> Vec { .collect() } -fn read_logs(buffer: &Arc>>) -> Vec { +fn read_logs(buffer: &Arc>>) -> Vec { buffer .lock() .expect("test log buffer mutex poisoned") - .clone() + .iter() + .cloned() + .collect() } async fn log_from_nested_function() { @@ -126,7 +130,7 @@ async fn test_gateway_setup_error_includes_core_logs(_: PgPoolOptions, options: async fn scope_setup_logs_captures_logs_inside_scope() { init_tracing_once(); - let buffer = Arc::new(Mutex::new(Vec::new())); + let buffer = Arc::new(Mutex::new(VecDeque::new())); scope_setup_logs(Arc::clone(&buffer), async { info!("captured in setup scope"); @@ -142,7 +146,7 @@ async fn scope_setup_logs_captures_logs_inside_scope() { async fn nested_awaited_calls_are_captured() { init_tracing_once(); - let buffer = Arc::new(Mutex::new(Vec::new())); + let buffer = Arc::new(Mutex::new(VecDeque::new())); scope_setup_logs(Arc::clone(&buffer), async { log_from_nested_function().await; @@ -158,7 +162,7 @@ async fn nested_awaited_calls_are_captured() { async fn buffer_is_bounded_to_max_core_log_lines() { init_tracing_once(); - let buffer = Arc::new(Mutex::new(Vec::new())); + let buffer = Arc::new(Mutex::new(VecDeque::new())); scope_setup_logs(Arc::clone(&buffer), async { for idx in 0..(MAX_CORE_LOG_LINES + 5) { From b0fbfeb0dfeb0833e421ac3eb306fae0c3f5221f Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 9 Mar 2026 19:02:25 +0100 Subject: [PATCH 18/18] docstrings --- crates/defguard_core/src/setup_logs.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/defguard_core/src/setup_logs.rs b/crates/defguard_core/src/setup_logs.rs index 9b10df24b4..45376606ee 100644 --- a/crates/defguard_core/src/setup_logs.rs +++ b/crates/defguard_core/src/setup_logs.rs @@ -8,13 +8,15 @@ use tracing_subscriber::{Layer, layer::Context}; pub const MAX_CORE_LOG_LINES: usize = 200; -#[derive(Clone)] -pub struct CoreSetupLogLayer; - tokio::task_local! { static CORE_SETUP_LOGS: Arc>>; } +/// Tracing layer that appends log messages to the active setup log buffer. +#[derive(Clone)] +pub struct CoreSetupLogLayer; + +/// Runs a future with the provided setup log buffer bound to the current task. pub async fn scope_setup_logs(buffer: Arc>>, future: F) -> T where F: std::future::Future,