diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index 3bbf9ec2d696..9e9b7539300b 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -258,13 +258,20 @@ fn build_transport_type_inner( Ok(TransportType::Http(http_endpoint)) } RequestPlaneMode::Tcp => { - let tcp_host = crate::utils::get_tcp_rpc_host_from_env(); + let tcp_host = std::env::var("DYN_TCP_RPC_ADVERTISE_HOST") + .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()); // If a fixed port is explicitly configured, use it directly (no init ordering dependency). // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used). - let tcp_port = std::env::var("DYN_TCP_RPC_PORT") + let tcp_port = std::env::var("DYN_TCP_RPC_ADVERTISE_PORT") .ok() .and_then(|p| p.parse::().ok()) .filter(|&p| p != 0) + .or_else(|| { + std::env::var("DYN_TCP_RPC_PORT") + .ok() + .and_then(|p| p.parse::().ok()) + .filter(|&p| p != 0) + }) .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?); // Include instance_id and endpoint name for proper TCP routing. diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 791c9c7c67ee..4aed7e19077f 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -344,7 +344,19 @@ impl DistributedRuntime { Ok(self .tcp_server .get_or_try_init(async move { - let options = tcp::server::ServerOptions::default(); + let options = tcp::server::ServerOptions { + port: std::env::var("DYN_TCP_RESP_PORT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(0), + interface: None, + host: std::env::var("DYN_TCP_RESP_HOST").ok(), + advertise_host: std::env::var("DYN_TCP_RESP_ADVERTISE_HOST").ok(), + advertise_port: std::env::var("DYN_TCP_RESP_ADVERTISE_PORT") + .ok() + .and_then(|p| p.parse::().ok()) + .filter(|&p| p != 0), + }; let server = tcp::server::TcpStreamServer::new(options).await?; Ok::<_, PipelineError>(server) }) diff --git a/lib/runtime/src/pipeline/network/tcp/server.rs b/lib/runtime/src/pipeline/network/tcp/server.rs index a49e1f9d3165..39c3b0bf63ff 100644 --- a/lib/runtime/src/pipeline/network/tcp/server.rs +++ b/lib/runtime/src/pipeline/network/tcp/server.rs @@ -68,6 +68,21 @@ pub struct ServerOptions { #[builder(default)] pub interface: Option, + + /// Explicit bind host address. Takes priority over interface and auto-detection. + #[builder(default)] + pub host: Option, + + /// Override the host address advertised to peers. + /// If set, this address is used in connection_info instead of the bind address. + /// The server still binds to the resolved local IP; only the advertised address changes. + #[builder(default)] + pub advertise_host: Option, + + /// Override the port advertised to peers. + /// If set, this port is used in connection_info instead of the actual listening port. + #[builder(default)] + pub advertise_port: Option, } impl ServerOptions { @@ -82,6 +97,8 @@ impl ServerOptions { pub struct TcpStreamServer { local_ip: String, local_port: u16, + advertise_ip: String, + advertise_port: u16, state: Arc>, } @@ -138,60 +155,63 @@ impl TcpStreamServer { options: ServerOptions, resolver: R, ) -> Result, PipelineError> { - let local_ip = match options.interface { - Some(interface) => { - let interfaces: HashMap = - list_afinet_netifas()?.into_iter().collect(); - - interfaces - .get(&interface) - .ok_or(PipelineError::Generic(format!( - "Interface not found: {}", - interface - )))? - .to_string() - } - None => { - let resolved_ip = resolver.local_ip().or_else(|err| match err { - Error::LocalIpAddressNotFound => resolver.local_ipv6(), - _ => Err(err), - }); - - match resolved_ip { - Ok(addr) => addr, - // Only fall back to loopback when no routable IP exists at all; - // propagate other resolver errors (I/O, platform) so - // misconfigured hosts fail fast instead of silently binding - // to 127.0.0.1. - Err(Error::LocalIpAddressNotFound) => { - tracing::warn!( - "No routable local IP address found; falling back to 127.0.0.1" - ); - IpAddr::from([127, 0, 0, 1]) - } - Err(err) => { - return Err(PipelineError::Generic(format!( - "Failed to resolve local IP address: {err}" - ))); - } - } + let local_ip = if let Some(host) = options.host { + host + } else if let Some(interface) = options.interface { + let interfaces: HashMap = + list_afinet_netifas()?.into_iter().collect(); + + interfaces + .get(&interface) + .ok_or(PipelineError::Generic(format!( + "Interface not found: {}", + interface + )))? .to_string() + } else { + let resolved_ip = resolver.local_ip().or_else(|err| match err { + Error::LocalIpAddressNotFound => resolver.local_ipv6(), + _ => Err(err), + }); + + match resolved_ip { + Ok(addr) => addr, + Err(Error::LocalIpAddressNotFound) => { + tracing::warn!( + "No routable local IP address found; falling back to 127.0.0.1" + ); + IpAddr::from([127, 0, 0, 1]) + } + Err(err) => { + return Err(PipelineError::Generic(format!( + "Failed to resolve local IP address: {err}" + ))); + } } + .to_string() }; let state = Arc::new(Mutex::new(State::default())); + let advertise_host = options.advertise_host; + let advertise_port_override = options.advertise_port; + let local_port = Self::start(local_ip.clone(), options.port, state.clone()) .await .map_err(|e| { PipelineError::Generic(format!("Failed to start TcpStreamServer: {}", e)) })?; - tracing::debug!("tcp transport service on {local_ip}:{local_port}"); + let advertise_ip = advertise_host.unwrap_or_else(|| local_ip.clone()); + let advertise_port = advertise_port_override.unwrap_or(local_port); + + tracing::debug!("tcp transport service on {local_ip}:{local_port}, advertising as {advertise_ip}:{advertise_port}"); Ok(Arc::new(Self { local_ip, local_port, + advertise_ip, + advertise_port, state, })) } @@ -239,7 +259,7 @@ impl ResponseService for TcpStreamServer { async fn register(&self, options: StreamOptions) -> PendingConnections { // oneshot channels to pass back the sender and receiver objects - let address = format!("{}:{}", self.local_ip, self.local_port); + let address = format!("{}:{}", self.advertise_ip, self.advertise_port); tracing::debug!("Registering new TcpStream on {address}"); let send_stream = if options.enable_request_stream {