Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/runtime/src/component/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,20 @@ fn build_transport_type_inner(
Ok(TransportType::Http(http_endpoint))
}
RequestPlaneMode::Tcp => {
let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
let tcp_host = std::env::var("DYN_TCP_RPC_ADVERTISE_HOST")
.unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env());
// If a fixed port is explicitly configured, use it directly (no init ordering dependency).
// Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used).
let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
let tcp_port = std::env::var("DYN_TCP_RPC_ADVERTISE_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.filter(|&p| p != 0)
.or_else(|| {
Comment thread
fergusfinn marked this conversation as resolved.
std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.filter(|&p| p != 0)
})
.unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);

// Include instance_id and endpoint name for proper TCP routing.
Expand Down
14 changes: 13 additions & 1 deletion lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,19 @@ impl DistributedRuntime {
Ok(self
.tcp_server
.get_or_try_init(async move {
let options = tcp::server::ServerOptions::default();
let options = tcp::server::ServerOptions {
port: std::env::var("DYN_TCP_RESP_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(0),
interface: None,
host: std::env::var("DYN_TCP_RESP_HOST").ok(),
advertise_host: std::env::var("DYN_TCP_RESP_ADVERTISE_HOST").ok(),
advertise_port: std::env::var("DYN_TCP_RESP_ADVERTISE_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.filter(|&p| p != 0),
};
let server = tcp::server::TcpStreamServer::new(options).await?;
Ok::<_, PipelineError>(server)
})
Expand Down
98 changes: 59 additions & 39 deletions lib/runtime/src/pipeline/network/tcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ pub struct ServerOptions {

#[builder(default)]
pub interface: Option<String>,

/// Explicit bind host address. Takes priority over interface and auto-detection.
#[builder(default)]
pub host: Option<String>,

/// Override the host address advertised to peers.
/// If set, this address is used in connection_info instead of the bind address.
/// The server still binds to the resolved local IP; only the advertised address changes.
#[builder(default)]
pub advertise_host: Option<String>,

/// Override the port advertised to peers.
/// If set, this port is used in connection_info instead of the actual listening port.
#[builder(default)]
pub advertise_port: Option<u16>,
}

impl ServerOptions {
Expand All @@ -82,6 +97,8 @@ impl ServerOptions {
pub struct TcpStreamServer {
local_ip: String,
local_port: u16,
advertise_ip: String,
advertise_port: u16,
state: Arc<Mutex<State>>,
}

Expand Down Expand Up @@ -138,60 +155,63 @@ impl TcpStreamServer {
options: ServerOptions,
resolver: R,
) -> Result<Arc<Self>, PipelineError> {
let local_ip = match options.interface {
Some(interface) => {
let interfaces: HashMap<String, std::net::IpAddr> =
list_afinet_netifas()?.into_iter().collect();

interfaces
.get(&interface)
.ok_or(PipelineError::Generic(format!(
"Interface not found: {}",
interface
)))?
.to_string()
}
None => {
let resolved_ip = resolver.local_ip().or_else(|err| match err {
Error::LocalIpAddressNotFound => resolver.local_ipv6(),
_ => Err(err),
});

match resolved_ip {
Ok(addr) => addr,
// Only fall back to loopback when no routable IP exists at all;
// propagate other resolver errors (I/O, platform) so
// misconfigured hosts fail fast instead of silently binding
// to 127.0.0.1.
Err(Error::LocalIpAddressNotFound) => {
tracing::warn!(
"No routable local IP address found; falling back to 127.0.0.1"
);
IpAddr::from([127, 0, 0, 1])
}
Err(err) => {
return Err(PipelineError::Generic(format!(
"Failed to resolve local IP address: {err}"
)));
}
}
let local_ip = if let Some(host) = options.host {
host
} else if let Some(interface) = options.interface {
let interfaces: HashMap<String, std::net::IpAddr> =
list_afinet_netifas()?.into_iter().collect();

interfaces
.get(&interface)
.ok_or(PipelineError::Generic(format!(
"Interface not found: {}",
interface
)))?
.to_string()
} else {
let resolved_ip = resolver.local_ip().or_else(|err| match err {
Error::LocalIpAddressNotFound => resolver.local_ipv6(),
_ => Err(err),
});

match resolved_ip {
Ok(addr) => addr,
Err(Error::LocalIpAddressNotFound) => {
tracing::warn!(
"No routable local IP address found; falling back to 127.0.0.1"
);
IpAddr::from([127, 0, 0, 1])
}
Err(err) => {
return Err(PipelineError::Generic(format!(
"Failed to resolve local IP address: {err}"
)));
}
}
.to_string()
};

let state = Arc::new(Mutex::new(State::default()));

let advertise_host = options.advertise_host;
let advertise_port_override = options.advertise_port;

let local_port = Self::start(local_ip.clone(), options.port, state.clone())
.await
.map_err(|e| {
PipelineError::Generic(format!("Failed to start TcpStreamServer: {}", e))
})?;

tracing::debug!("tcp transport service on {local_ip}:{local_port}");
let advertise_ip = advertise_host.unwrap_or_else(|| local_ip.clone());
let advertise_port = advertise_port_override.unwrap_or(local_port);

tracing::debug!("tcp transport service on {local_ip}:{local_port}, advertising as {advertise_ip}:{advertise_port}");

Ok(Arc::new(Self {
local_ip,
local_port,
advertise_ip,
advertise_port,
state,
}))
}
Expand Down Expand Up @@ -239,7 +259,7 @@ impl ResponseService for TcpStreamServer {
async fn register(&self, options: StreamOptions) -> PendingConnections {
// oneshot channels to pass back the sender and receiver objects

let address = format!("{}:{}", self.local_ip, self.local_port);
let address = format!("{}:{}", self.advertise_ip, self.advertise_port);
tracing::debug!("Registering new TcpStream on {address}");

let send_stream = if options.enable_request_stream {
Expand Down
Loading