Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/defguard/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ async fn main() -> Result<(), anyhow::Error> {
settings.stats_purge_threshold()
), if settings.enable_stats_purge =>
bail!("Periodic stats purge task returned early: {res:?}"),
res = run_periodic_license_check(&pool, proxy_control_tx) =>
res = run_periodic_license_check(&pool, proxy_control_tx.clone()) =>
bail!("Periodic license check task returned early: {res:?}"),
res = run_utility_thread(&pool, gateway_tx.clone()) =>
res = run_utility_thread(&pool, gateway_tx.clone(), proxy_control_tx) =>
bail!("Utility thread returned early: {res:?}"),
res = run_event_router(
RouterReceiverSet::new(
Expand Down
21 changes: 21 additions & 0 deletions crates/defguard_common/src/db/models/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ pub enum SettingsUrlError {
DefguardUrlUsesIpAddress(String),
#[error("Invalid WebAuthn configuration for defguard_url `{0}`: {1}")]
InvalidWebauthnConfiguration(String, String),
#[error("Public Edge URL is not configured")]
PublicEdgeUrlEmpty,
#[error("Unparsable Edge url: {0}")]
UnparsableEdgeUrl(String),
#[error("Edge url missing hostname: {0}")]
EdgeUrlMissingHostname(String),
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -770,6 +776,21 @@ impl Settings {
Url::parse(&self.public_proxy_url)
}

pub fn proxy_hostname(&self) -> Result<String, SettingsUrlError> {
if self.public_proxy_url.trim().is_empty() {
return Err(SettingsUrlError::PublicEdgeUrlEmpty);
}
let url = self
.proxy_public_url()
.map_err(|_err| SettingsUrlError::UnparsableEdgeUrl(self.public_proxy_url.clone()))?;
let hostname = url
.host_str()
.ok_or_else(|| SettingsUrlError::EdgeUrlMissingHostname(self.public_proxy_url.clone()))?
.to_string();
Comment thread
j-chmielewski marked this conversation as resolved.

Ok(hostname)
}

#[allow(deprecated)]
fn apply_from_config(&mut self, config: &DefGuardConfig) {
let minute = 60;
Expand Down
165 changes: 9 additions & 156 deletions crates/defguard_core/src/handlers/component_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use axum::{
extract::{Path, Query},
response::sse::{Event, KeepAlive, Sse},
};
use chrono::NaiveDateTime;
use defguard_certs::der_to_pem;
use defguard_common::{
VERSION,
Expand All @@ -32,17 +31,14 @@ use defguard_common::{
use defguard_proto::{
common::{CertificateInfo, DerPayload},
gateway::gateway_setup_client::GatewaySetupClient,
proxy::{
AcmeChallenge, AcmeLogs, AcmeStep, acme_issue_event, proxy_client::ProxyClient,
proxy_setup_client::ProxySetupClient,
},
proxy::{AcmeStep, proxy_setup_client::ProxySetupClient},
};
use defguard_version::{Version, client::ClientVersionInterceptor};
use futures::Stream;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender, unbounded_channel};
use tokio::sync::mpsc::{Sender, UnboundedReceiver, unbounded_channel};
use tokio_stream::StreamExt;
use tonic::{
Request, Status,
Expand All @@ -54,6 +50,7 @@ use tracing::Instrument;
use crate::{
auth::{AdminOrSetupRole, SessionInfo},
enterprise::is_enterprise_license_active,
letsencrypt::{ACME_TIMEOUT, acme_step_name, call_proxy_trigger_acme, parse_cert_expiry},
setup_logs::scope_setup_logs,
version::{MIN_GATEWAY_VERSION, MIN_PROXY_VERSION},
};
Expand Down Expand Up @@ -1059,9 +1056,6 @@ pub async fn setup_gateway_tls_stream(
Sse::new(stream).keep_alive(KeepAlive::default())
}

/// Maximum time (seconds) allowed for the ACME flow to complete end-to-end.
const ACME_TIMEOUT_SECS: u64 = 300;

#[derive(Debug, Serialize)]
struct AcmeSetupResponse {
step: &'static str,
Expand Down Expand Up @@ -1094,147 +1088,6 @@ fn acme_error_event(step: &'static str, message: String, logs: Option<Vec<String
Event::default().data(body)
}

/// Maps a proto [`AcmeStep`] to the SSE step string expected by the frontend.
fn acme_step_name(step: AcmeStep) -> &'static str {
match step {
AcmeStep::Unspecified | AcmeStep::Connecting => "Connecting",
AcmeStep::CheckingDomain => "CheckingDomain",
AcmeStep::ValidatingDomain => "ValidatingDomain",
AcmeStep::IssuingCertificate => "IssuingCertificate",
}
}

fn parse_cert_expiry(cert_pem: &str) -> Option<NaiveDateTime> {
let der = defguard_certs::parse_pem_certificate(cert_pem)
.map_err(|e| warn!("Failed to parse ACME cert PEM for expiry: {e}"))
.ok()?;
defguard_certs::CertificateInfo::from_der(&der)
.map(|info| info.not_after)
.map_err(|e| warn!("Failed to extract expiry from ACME cert: {e}"))
.ok()
}

fn public_proxy_hostname() -> Result<String, String> {
let public_proxy_url = Settings::get_current_settings().public_proxy_url;
let url = public_proxy_url.trim();

if url.is_empty() {
return Err(
"Public Edge URL is not configured. Please re-submit the external URL settings \
with a Let's Encrypt domain."
.to_string(),
);
}

Url::parse(url)
.ok()
.and_then(|u| u.host_str().map(ToString::to_string))
.filter(|host| !host.is_empty())
.ok_or_else(|| {
"Public Edge URL is not configured with a valid hostname. Please re-submit the \
external URL settings with a valid domain."
.to_string()
})
}

/// Connects to the proxy's permanent `Proxy` gRPC service and calls `TriggerAcme`.
///
/// Returns `(cert_pem, key_pem, account_credentials_json)` on success, or
/// `(error_message, log_lines)` on failure where `log_lines` are the proxy log entries
/// collected during the ACME run (sent by the proxy via an [`AcmeLogs`] event).
async fn call_proxy_trigger_acme(
pool: &PgPool,
proxy_host: &str,
proxy_port: u16,
domain: String,
account_credentials_json: String,
progress_tx: UnboundedSender<AcmeStep>,
) -> Result<(String, String, String), (String, Vec<String>)> {
let certs = Certificates::get_or_default(pool)
.await
.map_err(|e| (format!("Failed to load certificates: {e}"), Vec::new()))?;
let ca_cert_der = certs.ca_cert_der.ok_or_else(|| {
(
"CA certificate not found in settings".to_string(),
Vec::new(),
)
})?;

let cert_pem = der_to_pem(&ca_cert_der, defguard_certs::PemLabel::Certificate)
.map_err(|e| (format!("Failed to convert CA cert to PEM: {e}"), Vec::new()))?;

let endpoint_str = format!("https://{proxy_host}:{proxy_port}");
let endpoint = Endpoint::from_shared(endpoint_str)
.map_err(|e| (format!("Failed to build Edge endpoint: {e}"), Vec::new()))?
.http2_keep_alive_interval(Duration::from_secs(5))
.tcp_keepalive(Some(Duration::from_secs(5)))
.keep_alive_while_idle(true);

let tls = ClientTlsConfig::new().ca_certificate(Certificate::from_pem(cert_pem));
let endpoint = endpoint.tls_config(tls).map_err(|e| {
(
format!("Failed to configure TLS for Edge endpoint: {e}"),
Vec::new(),
)
})?;

let version = Version::parse(VERSION)
.map_err(|e| (format!("Failed to parse core version: {e}"), Vec::new()))?;
let version_interceptor = ClientVersionInterceptor::new(version);

let mut client =
ProxyClient::with_interceptor(endpoint.connect_lazy(), move |req: Request<()>| {
version_interceptor.clone().call(req)
});

let mut stream = client
.trigger_acme(AcmeChallenge {
domain: domain.clone(),
account_credentials_json,
})
.await
.map_err(|e| (format!("TriggerAcme RPC failed: {e}"), Vec::new()))?
.into_inner();

let mut collected_logs: Vec<String> = Vec::new();

loop {
match stream.message().await {
Ok(Some(event)) => match event.payload {
Some(acme_issue_event::Payload::Progress(p)) => {
if let Ok(step) = AcmeStep::try_from(p.step) {
let _ = progress_tx.send(step);
}
}
Some(acme_issue_event::Payload::Certificate(cert)) => {
return Ok((cert.cert_pem, cert.key_pem, cert.account_credentials_json));
}
Some(acme_issue_event::Payload::Logs(AcmeLogs { lines })) => {
collected_logs = lines;
}
None => {
return Err((
"TriggerAcme stream sent an event with no payload".to_string(),
collected_logs,
));
}
},
Ok(None) => {
return Err((
"TriggerAcme stream ended without delivering a certificate".to_string(),
collected_logs,
));
}
Err(e) => {
return Err((
format!("Failed to read TriggerAcme response: {e}"),
collected_logs,
));
}
}
}
}

/// Streams Let's Encrypt certificate issuance progress as Server-Sent Events.
///
/// Delegates the ACME HTTP-01 process to the proxy component via the `TriggerAcme`
Expand All @@ -1259,10 +1112,11 @@ pub async fn stream_proxy_acme(
}
};

let domain = match public_proxy_hostname() {
let settings = Settings::get_current_settings();
let domain = match settings.proxy_hostname() {
Ok(domain) => domain,
Err(message) => {
yield Ok(acme_error_event("Connecting", message, None));
Err(err) => {
yield Ok(acme_error_event("Connecting", err.to_string(), None));
return;
}
};
Expand Down Expand Up @@ -1321,8 +1175,7 @@ pub async fn stream_proxy_acme(
});

let mut current_step: &'static str = "Connecting";
let deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(ACME_TIMEOUT_SECS);
let deadline = tokio::time::Instant::now() + ACME_TIMEOUT;

// Drain progress steps until the ACME task finishes (channel closed) or times out.
loop {
Expand All @@ -1345,7 +1198,7 @@ pub async fn stream_proxy_acme(
current_step,
format!(
"ACME certificate issuance timed out after \
{ACME_TIMEOUT_SECS} seconds."
{} seconds.", ACME_TIMEOUT.as_secs()
),
None,
));
Expand Down
Loading
Loading