Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 92 additions & 27 deletions crates/defguard_setup/src/auto_adoption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ impl Drop for TaskGuard {

fn adoption_failure(message: impl Into<String>) -> (bool, Vec<String>, Option<CertificateInfo>) {
let msg = message.into();
error!("{msg}");
(false, vec![msg], None)
}

Expand All @@ -175,8 +176,11 @@ fn collect_stream_logs(log_rx: &mut UnboundedReceiver<String>) -> Vec<String> {
}

fn adoption_failure_with_logs(
message: impl Into<String>,
log_rx: &mut UnboundedReceiver<String>,
) -> (bool, Vec<String>, Option<CertificateInfo>) {
let msg = message.into();
error!("{msg}");
let logs = collect_stream_logs(log_rx);
(false, logs, None)
}
Expand All @@ -186,6 +190,7 @@ async fn run_edge_adoption_attempt(
host: &str,
port: u16,
) -> (bool, Vec<String>, Option<CertificateInfo>) {
debug!("Starting edge adoption attempt host={host} port={port}");
let (log_tx, mut log_rx) = tokio::sync::mpsc::unbounded_channel::<String>();

let settings = Settings::get_current_settings();
Expand Down Expand Up @@ -278,10 +283,19 @@ async fn run_edge_adoption_attempt(

if let Some(edge_version) = edge_version {
if edge_version < MIN_PROXY_VERSION {
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!(
"Edge version {edge_version} is below minimum required {MIN_PROXY_VERSION}; aborting adoption"
),
&mut log_rx,
);
}
debug!("Edge version {edge_version} accepted; proceeding with CSR exchange");
} else {
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
"Edge component did not return a version header; cannot verify compatibility",
&mut log_rx,
);
}

let mut response = response_with_metadata.into_inner();
Expand Down Expand Up @@ -310,10 +324,13 @@ async fn run_edge_adoption_attempt(
let _log_task_guard = TaskGuard(log_reader_task);

let Some(hostname) = url.host_str() else {
error!("Failed to extract hostname from proxy URL");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to extract hostname from edge/proxy URL: {url}"),
&mut log_rx,
);
};

debug!("Requesting CSR from proxy hostname={hostname}");
let csr_response = match client
.get_csr(ProxyCertificateInfo {
cert_hostname: hostname.to_string(),
Expand All @@ -322,16 +339,21 @@ async fn run_edge_adoption_attempt(
{
Ok(response) => response.into_inner(),
Err(err) => {
error!("Failed to get CSR from proxy: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to get CSR from proxy: {err}"),
&mut log_rx,
);
}
};
debug!("CSR received from proxy hostname={hostname}");

let csr = match Csr::from_der(&csr_response.der_data) {
Ok(csr) => csr,
Err(err) => {
error!("Failed to parse CSR: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to parse CSR from proxy: {err}"),
&mut log_rx,
);
}
};

Expand All @@ -345,28 +367,40 @@ async fn run_edge_adoption_attempt(
let cert = match ca.sign_csr(&csr) {
Ok(cert) => cert,
Err(err) => {
error!("Failed to sign CSR: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to sign CSR for proxy: {err}"),
&mut log_rx,
);
}
};
debug!("CSR signed for proxy hostname={hostname}; sending certificate");

if let Err(err) = client
.send_cert(ProxyDerPayload {
der_data: cert.der().to_vec(),
})
.await
{
error!("Failed to send certificate to proxy: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to send certificate to proxy: {err}"),
&mut log_rx,
);
}
debug!("Certificate delivered to proxy hostname={hostname}");

let cert_info = match CertificateInfo::from_der(cert.der()) {
Ok(info) => info,
Err(err) => {
error!("Failed to parse certificate info: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to parse certificate info from proxy cert: {err}"),
&mut log_rx,
);
}
};
debug!(
"Edge adoption handshake complete hostname={hostname} cert_serial={} expires={}",
cert_info.serial, cert_info.not_after
);

let mut logs = collect_stream_logs(&mut log_rx);
if logs.is_empty() {
Expand All @@ -380,6 +414,7 @@ async fn run_gateway_adoption_attempt(
host: &str,
port: u16,
) -> (bool, Vec<String>, Option<CertificateInfo>) {
debug!("Starting gateway adoption attempt host={host} port={port}");
let (log_tx, mut log_rx) = tokio::sync::mpsc::unbounded_channel::<String>();

let settings = Settings::get_current_settings();
Expand Down Expand Up @@ -477,10 +512,19 @@ async fn run_gateway_adoption_attempt(

if let Some(gateway_version) = gateway_version {
if gateway_version < MIN_GATEWAY_VERSION {
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!(
"Gateway version {gateway_version} is below minimum required {MIN_GATEWAY_VERSION}; aborting adoption"
),
&mut log_rx,
);
}
debug!("Gateway version {gateway_version} accepted; proceeding with CSR exchange");
} else {
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
"Gateway component did not return a version header; cannot verify compatibility",
&mut log_rx,
);
}

let mut response = response_with_metadata.into_inner();
Expand Down Expand Up @@ -509,9 +553,13 @@ async fn run_gateway_adoption_attempt(
let _log_task_guard = TaskGuard(log_reader_task);

let Some(hostname) = url.host_str() else {
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to extract hostname from gateway URL: {url}"),
&mut log_rx,
);
};

debug!("Requesting CSR from gateway hostname={hostname}");
let csr_response = match client
.get_csr(GatewayCertificateInfo {
cert_hostname: hostname.to_string(),
Expand All @@ -520,16 +568,21 @@ async fn run_gateway_adoption_attempt(
{
Ok(response) => response.into_inner(),
Err(err) => {
error!("Failed to get CSR from gateway: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to get CSR from gateway: {err}"),
&mut log_rx,
);
}
};
debug!("CSR received from gateway hostname={hostname}");

let csr = match Csr::from_der(&csr_response.der_data) {
Ok(csr) => csr,
Err(err) => {
error!("Failed to parse CSR: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to parse CSR from gateway: {err}"),
&mut log_rx,
);
}
};

Expand All @@ -543,28 +596,40 @@ async fn run_gateway_adoption_attempt(
let cert = match ca.sign_csr(&csr) {
Ok(cert) => cert,
Err(err) => {
error!("Failed to sign CSR: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to sign CSR for gateway: {err}"),
&mut log_rx,
);
}
};
debug!("CSR signed for gateway hostname={hostname}; sending certificate");

if let Err(err) = client
.send_cert(GatewayDerPayload {
der_data: cert.der().to_vec(),
})
.await
{
error!("Failed to send certificate to gateway: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to send certificate to gateway: {err}"),
&mut log_rx,
);
}
debug!("Certificate delivered to gateway hostname={hostname}");

let cert_info = match CertificateInfo::from_der(cert.der()) {
Ok(info) => info,
Err(err) => {
error!("Failed to parse certificate info: {err}");
return adoption_failure_with_logs(&mut log_rx);
return adoption_failure_with_logs(
format!("Failed to parse certificate info from gateway cert: {err}"),
&mut log_rx,
);
}
};
debug!(
"Gateway adoption handshake complete hostname={hostname} cert_serial={} expires={}",
cert_info.serial, cert_info.not_after
);

let mut logs = collect_stream_logs(&mut log_rx);
if logs.is_empty() {
Expand Down
Loading