From 34147ea9835026c6a10789e2bef3b0bbac08f529 Mon Sep 17 00:00:00 2001 From: Fergus Date: Thu, 9 Apr 2026 16:08:10 +0100 Subject: [PATCH 1/2] feat: add TCP advertise address support for cross-cluster deployments Add support for separating bind and advertise addresses on both the request plane and response plane TCP servers. This enables deployments where the frontend pod IP is not directly routable by workers (e.g., multi-cluster setups with NAT or VPN). New environment variables: Response TCP server (TcpStreamServer): - DYN_TCP_RESP_HOST: bind host override (default: auto-detect) - DYN_TCP_RESP_PORT: bind port override (default: OS-assigned) - DYN_TCP_RESP_ADVERTISE_HOST: advertise host override (default: bind host) - DYN_TCP_RESP_ADVERTISE_PORT: advertise port override (default: bind port) Request plane TCP (endpoint registration): - DYN_TCP_RPC_ADVERTISE_HOST: advertise host override (default: DYN_TCP_RPC_HOST) - DYN_TCP_RPC_ADVERTISE_PORT: advertise port override (default: DYN_TCP_RPC_PORT) When advertise vars are set, the server binds locally as usual but communicates the advertise address to peers via connection_info and etcd registration. This allows exposing the TCP ports via a NodePort, LoadBalancer, or VPN endpoint without changing how the server binds. --- lib/runtime/src/component/endpoint.rs | 12 ++- lib/runtime/src/distributed.rs | 13 ++- .../src/pipeline/network/tcp/server.rs | 98 +++++++++++-------- 3 files changed, 80 insertions(+), 43 deletions(-) diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index 3bbf9ec2d696..4a7bbfbe4f33 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -258,13 +258,19 @@ fn build_transport_type_inner( Ok(TransportType::Http(http_endpoint)) } RequestPlaneMode::Tcp => { - let tcp_host = crate::utils::get_tcp_rpc_host_from_env(); + let tcp_host = std::env::var("DYN_TCP_RPC_ADVERTISE_HOST") + .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()); // If a fixed port is explicitly configured, use it directly (no init ordering dependency). // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used). - let tcp_port = std::env::var("DYN_TCP_RPC_PORT") + let tcp_port = std::env::var("DYN_TCP_RPC_ADVERTISE_PORT") .ok() .and_then(|p| p.parse::().ok()) - .filter(|&p| p != 0) + .or_else(|| { + std::env::var("DYN_TCP_RPC_PORT") + .ok() + .and_then(|p| p.parse::().ok()) + .filter(|&p| p != 0) + }) .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?); // Include instance_id and endpoint name for proper TCP routing. diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 791c9c7c67ee..4c39ca3f18f6 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -344,7 +344,18 @@ impl DistributedRuntime { Ok(self .tcp_server .get_or_try_init(async move { - let options = tcp::server::ServerOptions::default(); + let options = tcp::server::ServerOptions { + port: std::env::var("DYN_TCP_RESP_PORT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(0), + interface: None, + host: std::env::var("DYN_TCP_RESP_HOST").ok(), + advertise_host: std::env::var("DYN_TCP_RESP_ADVERTISE_HOST").ok(), + advertise_port: std::env::var("DYN_TCP_RESP_ADVERTISE_PORT") + .ok() + .and_then(|p| p.parse().ok()), + }; let server = tcp::server::TcpStreamServer::new(options).await?; Ok::<_, PipelineError>(server) }) diff --git a/lib/runtime/src/pipeline/network/tcp/server.rs b/lib/runtime/src/pipeline/network/tcp/server.rs index a49e1f9d3165..39c3b0bf63ff 100644 --- a/lib/runtime/src/pipeline/network/tcp/server.rs +++ b/lib/runtime/src/pipeline/network/tcp/server.rs @@ -68,6 +68,21 @@ pub struct ServerOptions { #[builder(default)] pub interface: Option, + + /// Explicit bind host address. Takes priority over interface and auto-detection. + #[builder(default)] + pub host: Option, + + /// Override the host address advertised to peers. + /// If set, this address is used in connection_info instead of the bind address. + /// The server still binds to the resolved local IP; only the advertised address changes. + #[builder(default)] + pub advertise_host: Option, + + /// Override the port advertised to peers. + /// If set, this port is used in connection_info instead of the actual listening port. + #[builder(default)] + pub advertise_port: Option, } impl ServerOptions { @@ -82,6 +97,8 @@ impl ServerOptions { pub struct TcpStreamServer { local_ip: String, local_port: u16, + advertise_ip: String, + advertise_port: u16, state: Arc>, } @@ -138,60 +155,63 @@ impl TcpStreamServer { options: ServerOptions, resolver: R, ) -> Result, PipelineError> { - let local_ip = match options.interface { - Some(interface) => { - let interfaces: HashMap = - list_afinet_netifas()?.into_iter().collect(); - - interfaces - .get(&interface) - .ok_or(PipelineError::Generic(format!( - "Interface not found: {}", - interface - )))? - .to_string() - } - None => { - let resolved_ip = resolver.local_ip().or_else(|err| match err { - Error::LocalIpAddressNotFound => resolver.local_ipv6(), - _ => Err(err), - }); - - match resolved_ip { - Ok(addr) => addr, - // Only fall back to loopback when no routable IP exists at all; - // propagate other resolver errors (I/O, platform) so - // misconfigured hosts fail fast instead of silently binding - // to 127.0.0.1. - Err(Error::LocalIpAddressNotFound) => { - tracing::warn!( - "No routable local IP address found; falling back to 127.0.0.1" - ); - IpAddr::from([127, 0, 0, 1]) - } - Err(err) => { - return Err(PipelineError::Generic(format!( - "Failed to resolve local IP address: {err}" - ))); - } - } + let local_ip = if let Some(host) = options.host { + host + } else if let Some(interface) = options.interface { + let interfaces: HashMap = + list_afinet_netifas()?.into_iter().collect(); + + interfaces + .get(&interface) + .ok_or(PipelineError::Generic(format!( + "Interface not found: {}", + interface + )))? .to_string() + } else { + let resolved_ip = resolver.local_ip().or_else(|err| match err { + Error::LocalIpAddressNotFound => resolver.local_ipv6(), + _ => Err(err), + }); + + match resolved_ip { + Ok(addr) => addr, + Err(Error::LocalIpAddressNotFound) => { + tracing::warn!( + "No routable local IP address found; falling back to 127.0.0.1" + ); + IpAddr::from([127, 0, 0, 1]) + } + Err(err) => { + return Err(PipelineError::Generic(format!( + "Failed to resolve local IP address: {err}" + ))); + } } + .to_string() }; let state = Arc::new(Mutex::new(State::default())); + let advertise_host = options.advertise_host; + let advertise_port_override = options.advertise_port; + let local_port = Self::start(local_ip.clone(), options.port, state.clone()) .await .map_err(|e| { PipelineError::Generic(format!("Failed to start TcpStreamServer: {}", e)) })?; - tracing::debug!("tcp transport service on {local_ip}:{local_port}"); + let advertise_ip = advertise_host.unwrap_or_else(|| local_ip.clone()); + let advertise_port = advertise_port_override.unwrap_or(local_port); + + tracing::debug!("tcp transport service on {local_ip}:{local_port}, advertising as {advertise_ip}:{advertise_port}"); Ok(Arc::new(Self { local_ip, local_port, + advertise_ip, + advertise_port, state, })) } @@ -239,7 +259,7 @@ impl ResponseService for TcpStreamServer { async fn register(&self, options: StreamOptions) -> PendingConnections { // oneshot channels to pass back the sender and receiver objects - let address = format!("{}:{}", self.local_ip, self.local_port); + let address = format!("{}:{}", self.advertise_ip, self.advertise_port); tracing::debug!("Registering new TcpStream on {address}"); let send_stream = if options.enable_request_stream { From a1f710c0d3c316c0d17b72dd5fe9f0586d50ee2b Mon Sep 17 00:00:00 2001 From: Fergus Date: Thu, 9 Apr 2026 16:14:49 +0100 Subject: [PATCH 2/2] fix: filter zero from advertise port env vars Reject port 0 from DYN_TCP_RPC_ADVERTISE_PORT and DYN_TCP_RESP_ADVERTISE_PORT so templated configs that default to 0 fall through to the actual bound port instead of publishing an unconnectable address. --- lib/runtime/src/component/endpoint.rs | 1 + lib/runtime/src/distributed.rs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index 4a7bbfbe4f33..9e9b7539300b 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -265,6 +265,7 @@ fn build_transport_type_inner( let tcp_port = std::env::var("DYN_TCP_RPC_ADVERTISE_PORT") .ok() .and_then(|p| p.parse::().ok()) + .filter(|&p| p != 0) .or_else(|| { std::env::var("DYN_TCP_RPC_PORT") .ok() diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 4c39ca3f18f6..4aed7e19077f 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -354,7 +354,8 @@ impl DistributedRuntime { advertise_host: std::env::var("DYN_TCP_RESP_ADVERTISE_HOST").ok(), advertise_port: std::env::var("DYN_TCP_RESP_ADVERTISE_PORT") .ok() - .and_then(|p| p.parse().ok()), + .and_then(|p| p.parse::().ok()) + .filter(|&p| p != 0), }; let server = tcp::server::TcpStreamServer::new(options).await?; Ok::<_, PipelineError>(server)