diff --git a/Cargo.lock b/Cargo.lock index 84d6c1a8..45abf98f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5509,7 +5509,7 @@ dependencies = [ [[package]] name = "sof" -version = "0.18.1" +version = "0.18.2" dependencies = [ "agave-transaction-view", "arcshift", @@ -5573,11 +5573,11 @@ dependencies = [ [[package]] name = "sof-gossip-tuning" -version = "0.18.1" +version = "0.18.2" [[package]] name = "sof-solana-compat" -version = "0.18.1" +version = "0.18.2" dependencies = [ "async-trait", "bincode", @@ -5665,7 +5665,7 @@ dependencies = [ [[package]] name = "sof-tx" -version = "0.18.1" +version = "0.18.2" dependencies = [ "arcshift", "async-trait", @@ -5697,7 +5697,7 @@ dependencies = [ [[package]] name = "sof-types" -version = "0.18.1" +version = "0.18.2" dependencies = [ "bs58", "serde", diff --git a/crates/sof-gossip-tuning/Cargo.toml b/crates/sof-gossip-tuning/Cargo.toml index f9038f46..a34b3d06 100644 --- a/crates/sof-gossip-tuning/Cargo.toml +++ b/crates/sof-gossip-tuning/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sof-gossip-tuning" -version = "0.18.1" +version = "0.18.2" edition.workspace = true description = "Typed gossip and ingest tuning presets for SOF hosts" license = "Apache-2.0 OR MIT" diff --git a/crates/sof-observer/Cargo.toml b/crates/sof-observer/Cargo.toml index 214d3b94..4433caef 100644 --- a/crates/sof-observer/Cargo.toml +++ b/crates/sof-observer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sof" -version = "0.18.1" +version = "0.18.2" edition.workspace = true description = "Solana Observer Framework for low-latency shred ingestion and plugin-driven transaction observation" license = "Apache-2.0 OR MIT" @@ -32,8 +32,8 @@ provider-websocket = [] [dependencies] agave-transaction-view = { version = "3.1.11", features = ["agave-unstable-api"] } -sof-gossip-tuning = { version = "0.18.1", path = "../sof-gossip-tuning" } -sof-types = { version = "0.18.1", path = "../sof-types", features = ["solana-compat"] } +sof-gossip-tuning = { version = "0.18.2", path = "../sof-gossip-tuning" } +sof-types = { version = "0.18.2", path = "../sof-types", features = ["solana-compat"] } solana-gossip = { package = "sof-solana-gossip", version = "3.1.11-sof.9", optional = true, features = ["agave-unstable-api"] } solana-entry = { version = "3.1.11", features = ["agave-unstable-api"] } solana-epoch-schedule = "3.0.0" diff --git a/crates/sof-observer/README.md b/crates/sof-observer/README.md index a6d2e6f0..97787f70 100644 --- a/crates/sof-observer/README.md +++ b/crates/sof-observer/README.md @@ -581,7 +581,7 @@ cargo add sof Optional gossip bootstrap support at compile time: ```toml -sof = { version = "0.18.1", features = ["gossip-bootstrap"] } +sof = { version = "0.18.2", features = ["gossip-bootstrap"] } ``` `gossip-bootstrap` uses the vendored `sof-solana-gossip` backend, but it no longer exact-pins the @@ -590,7 +590,7 @@ Solana `3.1.11` patch line. Downstream crates can resolve newer compatible `3.1. Optional external `kernel-bypass` ingress support: ```toml -sof = { version = "0.18.1", features = ["kernel-bypass"] } +sof = { version = "0.18.2", features = ["kernel-bypass"] } ``` The bundled `sof-solana-gossip` backend defaults to SOF's lightweight in-memory duplicate/conflict @@ -1088,6 +1088,7 @@ Design references: - Queue pressure drops hook events instead of stalling ingest. - Typed host tuning is available through `sof-gossip-tuning` and `RuntimeSetup::with_gossip_tuning_profile(...)`. - `RuntimeExtension` WebSocket connectors support full `ws://` / `wss://` handshake + frame decoding. +- Runtime extensions require non-empty names and resource metadata; startup rejects empty `resource_id` / shared tags and bounds `read_buffer_bytes`. - WebSocket close frames emit `RuntimePacketEventClass::ConnectionClosed` in `on_packet_received`. - WebSocket packet events expose `websocket_frame_type` (`Text`/`Binary`/`Ping`/`Pong`) for startup-time filtering and runtime routing. - In gossip mode, SOF runs as an active bounded relay client by default (UDP relay + repair serve), not as an observer-only passive consumer. diff --git a/crates/sof-observer/src/framework/extension_host.rs b/crates/sof-observer/src/framework/extension_host.rs index dee3016d..b2dbdb1b 100644 --- a/crates/sof-observer/src/framework/extension_host.rs +++ b/crates/sof-observer/src/framework/extension_host.rs @@ -21,7 +21,10 @@ use tokio::{ task::JoinHandle, time::timeout, }; -use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message}; +use tokio_tungstenite::{ + MaybeTlsStream, WebSocketStream, connect_async_with_config, + tungstenite::{Message, protocol::WebSocketConfig}, +}; use crate::framework::extension::{ ExtensionCapability, ExtensionContext, ExtensionManifest, ExtensionResourceSpec, @@ -42,6 +45,10 @@ const DEFAULT_STARTUP_TIMEOUT_SECS: u64 = 5; const DEFAULT_SHUTDOWN_TIMEOUT_SECS: u64 = 3; /// Per-read fallback buffer size used for extension resource sockets. const DEFAULT_RESOURCE_READ_BUFFER_BYTES: usize = 2_048; +/// Maximum extension resource read buffer accepted from one startup manifest. +const MAX_RESOURCE_READ_BUFFER_BYTES: usize = 1024 * 1024; +/// Multiplier used to cap extension websocket frames/messages relative to chunk size. +const EXTENSION_WEBSOCKET_MESSAGE_LIMIT_MULTIPLIER: usize = 64; /// Startup failure record for one extension. #[derive(Debug, Clone, Eq, PartialEq)] @@ -1175,18 +1182,24 @@ async fn spawn_ws_connector( extension: &Arc, spec: WsConnectorSpec, ) -> Result, String> { - let (stream, _response) = connect_async(spec.url.as_str()) - .await - .map_err(|error| format!("failed to connect websocket {}: {error}", spec.url))?; + let max_payload_chunk_bytes = spec + .read_buffer_bytes + .max(DEFAULT_RESOURCE_READ_BUFFER_BYTES); + let (stream, _response) = connect_async_with_config( + spec.url.as_str(), + Some(extension_websocket_transport_config( + max_payload_chunk_bytes, + )), + false, + ) + .await + .map_err(|error| format!("failed to connect websocket {}: {error}", spec.url))?; let io = stream.get_ref().get_ref(); let local_addr = io.local_addr().ok(); let peer_addr = io.peer_addr().ok(); let owner_extension = extension.name.to_owned(); let resource_id = spec.resource_id; let shared_tag = visibility_tag(spec.visibility); - let max_payload_chunk_bytes = spec - .read_buffer_bytes - .max(DEFAULT_RESOURCE_READ_BUFFER_BYTES); let handle = tokio::spawn(async move { let emitter = ExtensionResourceEmitter::new( host, @@ -1206,6 +1219,16 @@ async fn spawn_ws_connector( Ok(handle) } +/// Builds bounded websocket transport config for extension-owned connectors. +fn extension_websocket_transport_config(max_payload_chunk_bytes: usize) -> WebSocketConfig { + let max_message_size = max_payload_chunk_bytes + .max(DEFAULT_RESOURCE_READ_BUFFER_BYTES) + .saturating_mul(EXTENSION_WEBSOCKET_MESSAGE_LIMIT_MULTIPLIER); + WebSocketConfig::default() + .max_message_size(Some(max_message_size)) + .max_frame_size(Some(max_message_size)) +} + /// Reads packet chunks from one TCP stream and forwards them into the runtime. async fn read_tcp_stream_packets(context: ExtensionResourceReadContext, mut stream: TcpStream) { let mut buffer = vec![0_u8; context.max_payload_chunk_bytes.max(1)]; @@ -1341,6 +1364,9 @@ fn validate_manifest( manifest: &ExtensionManifest, policy: &RuntimeExtensionCapabilityPolicy, ) -> Result { + if extension_name.trim().is_empty() { + return Err("extension declares empty name".to_owned()); + } let capabilities: HashSet = manifest.capabilities.iter().copied().collect(); for capability in &capabilities { @@ -1353,25 +1379,56 @@ fn validate_manifest( let mut resource_ids = HashSet::::new(); for resource in &manifest.resources { - let (resource_id, required_capability) = match resource { - ExtensionResourceSpec::UdpListener(spec) => { - (&spec.resource_id, ExtensionCapability::BindUdp) - } - ExtensionResourceSpec::TcpListener(spec) => { - (&spec.resource_id, ExtensionCapability::BindTcp) - } - ExtensionResourceSpec::TcpConnector(spec) => { - (&spec.resource_id, ExtensionCapability::ConnectTcp) - } - ExtensionResourceSpec::WsConnector(spec) => { - (&spec.resource_id, ExtensionCapability::ConnectWebSocket) - } + let (resource_id, visibility, read_buffer_bytes, required_capability) = match resource { + ExtensionResourceSpec::UdpListener(spec) => ( + &spec.resource_id, + &spec.visibility, + spec.read_buffer_bytes, + ExtensionCapability::BindUdp, + ), + ExtensionResourceSpec::TcpListener(spec) => ( + &spec.resource_id, + &spec.visibility, + spec.read_buffer_bytes, + ExtensionCapability::BindTcp, + ), + ExtensionResourceSpec::TcpConnector(spec) => ( + &spec.resource_id, + &spec.visibility, + spec.read_buffer_bytes, + ExtensionCapability::ConnectTcp, + ), + ExtensionResourceSpec::WsConnector(spec) => ( + &spec.resource_id, + &spec.visibility, + spec.read_buffer_bytes, + ExtensionCapability::ConnectWebSocket, + ), }; + if resource_id.trim().is_empty() { + return Err(format!( + "extension `{extension_name}` declares empty resource_id" + )); + } if !resource_ids.insert(resource_id.clone()) { return Err(format!( "duplicate resource_id `{resource_id}` in startup manifest for extension `{extension_name}`" )); } + if read_buffer_bytes > MAX_RESOURCE_READ_BUFFER_BYTES { + return Err(format!( + "resource `{resource_id}` read_buffer_bytes {read_buffer_bytes} exceeds max {}", + MAX_RESOURCE_READ_BUFFER_BYTES + )); + } + if matches!( + visibility, + ExtensionStreamVisibility::Shared { tag } if tag.trim().is_empty() + ) { + return Err(format!( + "resource `{resource_id}` declares empty shared visibility tag" + )); + } if !capabilities.contains(&required_capability) { return Err(format!( "resource `{resource_id}` requires undeclared capability `{required_capability:?}`" @@ -1424,6 +1481,7 @@ mod tests { use crate::framework::ExtensionSetupError; use async_trait::async_trait; + use tokio::io::AsyncWriteExt; struct CounterExtension { name: &'static str, @@ -1553,6 +1611,120 @@ mod tests { assert_eq!(report.failed_extensions, 1); } + #[tokio::test] + async fn startup_rejects_empty_resource_id() { + let host = RuntimeExtensionHost::builder() + .add_extension(CounterExtension { + name: "empty-resource-id", + startup_manifest: ExtensionManifest { + capabilities: vec![ExtensionCapability::BindUdp], + resources: vec![ExtensionResourceSpec::UdpListener(UdpListenerSpec { + resource_id: " ".to_owned(), + bind_addr: SocketAddr::from_str("127.0.0.1:0").expect("valid addr"), + visibility: ExtensionStreamVisibility::Private, + read_buffer_bytes: 128, + })], + subscriptions: Vec::new(), + }, + packet_count: Arc::new(AtomicUsize::new(0)), + shutdown_wait: Duration::ZERO, + shutdown_called: Arc::new(AtomicBool::new(false)), + }) + .build(); + + let report = host.startup().await; + assert_eq!(report.active_extensions, 0); + assert_eq!(report.failed_extensions, 1); + assert!(report.failures[0].reason.contains("empty resource_id")); + } + + #[tokio::test] + async fn startup_rejects_empty_extension_name() { + let host = RuntimeExtensionHost::builder() + .add_extension(CounterExtension { + name: " ", + startup_manifest: ExtensionManifest { + capabilities: vec![ExtensionCapability::BindUdp], + resources: vec![ExtensionResourceSpec::UdpListener(UdpListenerSpec { + resource_id: "udp-feed".to_owned(), + bind_addr: SocketAddr::from_str("127.0.0.1:0").expect("valid addr"), + visibility: ExtensionStreamVisibility::Private, + read_buffer_bytes: 128, + })], + subscriptions: Vec::new(), + }, + packet_count: Arc::new(AtomicUsize::new(0)), + shutdown_wait: Duration::ZERO, + shutdown_called: Arc::new(AtomicBool::new(false)), + }) + .build(); + + let report = host.startup().await; + assert_eq!(report.active_extensions, 0); + assert_eq!(report.failed_extensions, 1); + assert!(report.failures[0].reason.contains("empty name")); + } + + #[tokio::test] + async fn startup_rejects_empty_shared_visibility_tag() { + let host = RuntimeExtensionHost::builder() + .add_extension(CounterExtension { + name: "empty-shared-tag", + startup_manifest: ExtensionManifest { + capabilities: vec![ExtensionCapability::BindTcp], + resources: vec![ExtensionResourceSpec::TcpListener(TcpListenerSpec { + resource_id: "tcp-feed".to_owned(), + bind_addr: SocketAddr::from_str("127.0.0.1:0").expect("valid addr"), + visibility: ExtensionStreamVisibility::Shared { + tag: " ".to_owned(), + }, + read_buffer_bytes: 128, + })], + subscriptions: Vec::new(), + }, + packet_count: Arc::new(AtomicUsize::new(0)), + shutdown_wait: Duration::ZERO, + shutdown_called: Arc::new(AtomicBool::new(false)), + }) + .build(); + + let report = host.startup().await; + assert_eq!(report.active_extensions, 0); + assert_eq!(report.failed_extensions, 1); + assert!( + report.failures[0] + .reason + .contains("empty shared visibility tag") + ); + } + + #[tokio::test] + async fn startup_rejects_oversized_read_buffer_bytes() { + let host = RuntimeExtensionHost::builder() + .add_extension(CounterExtension { + name: "oversized-read-buffer", + startup_manifest: ExtensionManifest { + capabilities: vec![ExtensionCapability::ConnectWebSocket], + resources: vec![ExtensionResourceSpec::WsConnector(WsConnectorSpec { + resource_id: "ws-feed".to_owned(), + url: "ws://127.0.0.1:1/feed".to_owned(), + visibility: ExtensionStreamVisibility::Private, + read_buffer_bytes: MAX_RESOURCE_READ_BUFFER_BYTES.saturating_add(1), + })], + subscriptions: Vec::new(), + }, + packet_count: Arc::new(AtomicUsize::new(0)), + shutdown_wait: Duration::ZERO, + shutdown_called: Arc::new(AtomicBool::new(false)), + }) + .build(); + + let report = host.startup().await; + assert_eq!(report.active_extensions, 0); + assert_eq!(report.failed_extensions, 1); + assert!(report.failures[0].reason.contains("read_buffer_bytes")); + } + #[tokio::test] async fn production_defaults_deny_outbound_connectors() { let host = RuntimeExtensionHost::production_builder() @@ -1893,11 +2065,7 @@ mod tests { let tcp_server_addr = tcp_server.local_addr().expect("tcp local addr"); let tcp_server_task = tokio::spawn(async move { if let Ok((mut stream, _)) = tcp_server.accept().await { - assert!( - tokio::io::AsyncWriteExt::write_all(&mut stream, b"tcp") - .await - .is_ok() - ); + assert!(stream.write_all(b"tcp").await.is_ok()); } }); @@ -1966,4 +2134,13 @@ mod tests { assert!(ws_server_task.await.is_ok()); host.shutdown().await; } + + #[test] + fn extension_websocket_transport_config_caps_frames_from_chunk_size() { + let config = extension_websocket_transport_config(4_096); + let expected = 4_096_usize.saturating_mul(EXTENSION_WEBSOCKET_MESSAGE_LIMIT_MULTIPLIER); + + assert_eq!(config.max_frame_size, Some(expected)); + assert_eq!(config.max_message_size, Some(expected)); + } } diff --git a/crates/sof-observer/src/provider_stream/websocket.rs b/crates/sof-observer/src/provider_stream/websocket.rs index 084a6f6a..f278d986 100644 --- a/crates/sof-observer/src/provider_stream/websocket.rs +++ b/crates/sof-observer/src/provider_stream/websocket.rs @@ -11,8 +11,8 @@ use std::{borrow::Cow, str::FromStr, sync::Arc, time::Duration}; use base64::{Engine as _, engine::general_purpose::STANDARD}; use futures_util::{SinkExt, StreamExt}; -use serde::Deserialize; -use serde_json::{Value, json}; +use serde::{Deserialize, de::DeserializeOwned}; +use serde_json::{Value, from_slice as json_from_slice, json}; use simd_json::{Buffers as SimdJsonBuffers, serde::from_slice as simd_from_slice}; use sof_types::{PubkeyBytes, SignatureBytes}; use solana_pubkey::Pubkey; @@ -23,7 +23,10 @@ use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_tungstenite::{ - MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message as WsMessage, + MaybeTlsStream, WebSocketStream, connect_async_with_config, + tungstenite::{ + protocol::{Message as WsMessage, WebSocketConfig}, + }, }; use crate::{ @@ -40,6 +43,17 @@ use crate::{ }, }; +/// Maximum provider websocket frame size accepted from upstream RPC sources. +const MAX_PROVIDER_WEBSOCKET_FRAME_BYTES: usize = 16 * 1024 * 1024; +/// Maximum provider websocket message size accepted from upstream RPC sources. +const MAX_PROVIDER_WEBSOCKET_MESSAGE_BYTES: usize = 16 * 1024 * 1024; +/// Maximum replay HTTP response body accepted from companion RPC endpoints. +const MAX_WEBSOCKET_REPLAY_HTTP_BODY_BYTES: usize = 64 * 1024 * 1024; +/// Connect timeout for websocket replay HTTP companion requests. +const WEBSOCKET_REPLAY_HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +/// Whole-request timeout for websocket replay HTTP companion requests. +const WEBSOCKET_REPLAY_HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + /// Commitment level used for websocket `transactionSubscribe` notifications. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub enum WebsocketTransactionCommitment { @@ -1941,7 +1955,12 @@ async fn run_websocket_logs_connection( async fn establish_websocket_logs_session( config: &WebsocketLogsConfig, ) -> Result { - let (mut stream, _response) = connect_async(config.endpoint()).await?; + let (mut stream, _response) = connect_async_with_config( + config.endpoint(), + Some(provider_websocket_transport_config()), + false, + ) + .await?; stream .send(WsMessage::Text( config.subscribe_request().to_string().into(), @@ -2106,7 +2125,7 @@ async fn replay_websocket_gap( return Err(WebsocketProtocolError::MissingReplayHttpEndpoint.into()); }; - let client = reqwest::Client::new(); + let client = websocket_replay_http_client()?; let head = rpc_get_slot(&client, &http_endpoint, config.commitment).await?; if head < previous_slot { return Ok(()); @@ -2177,7 +2196,12 @@ async fn establish_websocket_primary_session( { return Err(WebsocketProtocolError::MissingReplayHttpEndpoint.into()); } - let (mut stream, _response) = connect_async(config.endpoint()).await?; + let (mut stream, _response) = connect_async_with_config( + config.endpoint(), + Some(provider_websocket_transport_config()), + false, + ) + .await?; stream .send(WsMessage::Text( config.subscribe_request().to_string().into(), @@ -2187,6 +2211,12 @@ async fn establish_websocket_primary_session( Ok(stream) } +fn provider_websocket_transport_config() -> WebSocketConfig { + WebSocketConfig::default() + .max_frame_size(Some(MAX_PROVIDER_WEBSOCKET_FRAME_BYTES)) + .max_message_size(Some(MAX_PROVIDER_WEBSOCKET_MESSAGE_BYTES)) +} + fn websocket_replay_start_slot(previous_slot: u64, head: u64, replay_max_slots: u64) -> u64 { previous_slot.max( head.saturating_add(1) @@ -2207,12 +2237,78 @@ fn websocket_http_endpoint(config: &WebsocketTransactionConfig) -> Option Result { + reqwest::Client::builder() + .connect_timeout(WEBSOCKET_REPLAY_HTTP_CONNECT_TIMEOUT) + .timeout(WEBSOCKET_REPLAY_HTTP_REQUEST_TIMEOUT) + .build() + .map_err(|error| WebsocketProtocolError::HttpRpcFailed { + method: "replay-client", + detail: error.to_string(), + }) + .map_err(Into::into) +} + +async fn read_http_rpc_json_response( + response: reqwest::Response, + method: &'static str, +) -> Result, WebsocketTransactionError> +where + T: DeserializeOwned, +{ + if !response.status().is_success() { + return Err(WebsocketProtocolError::HttpRpcFailed { + method, + detail: format!("unexpected http status {}", response.status()), + } + .into()); + } + if let Some(body_len) = response.content_length() + && body_len > u64::try_from(MAX_WEBSOCKET_REPLAY_HTTP_BODY_BYTES).unwrap_or(u64::MAX) + { + return Err(WebsocketProtocolError::HttpRpcFailed { + method, + detail: format!( + "response body {body_len} exceeds max {} bytes", + MAX_WEBSOCKET_REPLAY_HTTP_BODY_BYTES + ), + } + .into()); + } + let mut body = Vec::new(); + let mut response = response; + while let Some(chunk) = response.chunk().await.map_err(|error| { + WebsocketProtocolError::HttpRpcFailed { + method, + detail: error.to_string(), + } + })? { + let next_len = body.len().saturating_add(chunk.len()); + if next_len > MAX_WEBSOCKET_REPLAY_HTTP_BODY_BYTES { + return Err(WebsocketProtocolError::HttpRpcFailed { + method, + detail: format!( + "response body exceeds max {} bytes", + MAX_WEBSOCKET_REPLAY_HTTP_BODY_BYTES + ), + } + .into()); + } + body.extend_from_slice(chunk.as_ref()); + } + json_from_slice(&body).map_err(|error| WebsocketProtocolError::HttpRpcDecodeFailed { + method, + detail: error.to_string(), + }) + .map_err(Into::into) +} + async fn rpc_get_slot( client: &reqwest::Client, endpoint: &str, commitment: WebsocketTransactionCommitment, ) -> Result { - let response: RpcJsonResponse = client + let response = client .post(endpoint) .json(&json!({ "jsonrpc": "2.0", @@ -2225,13 +2321,8 @@ async fn rpc_get_slot( .map_err(|error| WebsocketProtocolError::HttpRpcFailed { method: "getSlot", detail: error.to_string(), - })? - .json() - .await - .map_err(|error| WebsocketProtocolError::HttpRpcDecodeFailed { - method: "getSlot", - detail: error.to_string(), })?; + let response: RpcJsonResponse = read_http_rpc_json_response(response, "getSlot").await?; if let Some(error) = response.error { return Err(WebsocketProtocolError::HttpRpcFailed { method: "getSlot", @@ -2250,7 +2341,7 @@ async fn rpc_get_block( slot: u64, commitment: WebsocketTransactionCommitment, ) -> Result, WebsocketTransactionError> { - let response: RpcJsonResponse = client + let response = client .post(endpoint) .json(&json!({ "jsonrpc": "2.0", @@ -2272,13 +2363,9 @@ async fn rpc_get_block( .map_err(|error| WebsocketProtocolError::HttpRpcFailed { method: "getBlock", detail: error.to_string(), - })? - .json() - .await - .map_err(|error| WebsocketProtocolError::HttpRpcDecodeFailed { - method: "getBlock", - detail: error.to_string(), })?; + let response: RpcJsonResponse = + read_http_rpc_json_response(response, "getBlock").await?; if let Some(error) = response.error { return Err(WebsocketProtocolError::HttpRpcFailed { method: "getBlock", @@ -2564,6 +2651,7 @@ mod tests { use solana_message::{Message, VersionedMessage}; use solana_signer::Signer; use std::time::Instant; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tokio::time::{Duration, timeout}; use tokio_tungstenite::{accept_async, tungstenite::protocol::Message as WsMessage}; @@ -2603,6 +2691,21 @@ mod tests { .into_bytes() } + async fn spawn_http_response_server(response: String) -> String { + let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); + let addr = listener.local_addr().expect("local addr"); + tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.expect("accept"); + let mut request = [0_u8; 1024]; + let _ = stream.read(&mut request).await.expect("read request"); + stream + .write_all(response.as_bytes()) + .await + .expect("write response"); + }); + format!("http://{addr}") + } + #[cfg(feature = "provider-grpc")] #[test] fn websocket_filter_shape_matches_yellowstone_config() { @@ -2691,6 +2794,19 @@ mod tests { assert!(error.to_string().contains("subscription error")); } + #[test] + fn provider_websocket_transport_config_bounds_frames_and_messages() { + let config = provider_websocket_transport_config(); + assert_eq!( + config.max_frame_size, + Some(MAX_PROVIDER_WEBSOCKET_FRAME_BYTES) + ); + assert_eq!( + config.max_message_size, + Some(MAX_PROVIDER_WEBSOCKET_MESSAGE_BYTES) + ); + } + #[test] fn websocket_config_defaults_do_not_filter_vote_or_failed() { let request = WebsocketTransactionConfig::new("wss://example.invalid").subscribe_request(); @@ -2849,6 +2965,38 @@ mod tests { ); } + #[tokio::test] + async fn websocket_rpc_get_slot_rejects_non_success_status() { + let endpoint = spawn_http_response_server( + "HTTP/1.1 503 Service Unavailable\r\ncontent-length: 0\r\n\r\n".to_owned(), + ) + .await; + let client = websocket_replay_http_client().expect("client"); + let error = rpc_get_slot(&client, &endpoint, WebsocketTransactionCommitment::Processed) + .await + .expect_err("non-success status should fail"); + assert!(error.to_string().contains("unexpected http status 503")); + } + + #[tokio::test] + async fn websocket_rpc_get_block_rejects_oversized_body() { + let oversized = MAX_WEBSOCKET_REPLAY_HTTP_BODY_BYTES.saturating_add(1); + let endpoint = spawn_http_response_server(format!( + "HTTP/1.1 200 OK\r\ncontent-length: {oversized}\r\ncontent-type: application/json\r\n\r\n" + )) + .await; + let client = websocket_replay_http_client().expect("client"); + let error = rpc_get_block( + &client, + &endpoint, + 1, + WebsocketTransactionCommitment::Processed, + ) + .await + .expect_err("oversized body should fail"); + assert!(error.to_string().contains("exceeds max")); + } + #[test] fn websocket_replay_start_slot_includes_last_seen_slot() { assert_eq!(websocket_replay_start_slot(55, 55, 128), 55); diff --git a/crates/sof-solana-compat/Cargo.toml b/crates/sof-solana-compat/Cargo.toml index 86c245c2..1085cc38 100644 --- a/crates/sof-solana-compat/Cargo.toml +++ b/crates/sof-solana-compat/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sof-solana-compat" -version = "0.18.1" +version = "0.18.2" edition.workspace = true description = "Explicit Solana-coupled compatibility layer for SOF and sof-tx" license = "Apache-2.0 OR MIT" @@ -16,8 +16,8 @@ workspace = true [dependencies] async-trait = "0.1" -sof-tx = { path = "../sof-tx", version = "0.18.1" } -sof-types = { path = "../sof-types", version = "0.18.1", features = ["solana-compat"] } +sof-tx = { path = "../sof-tx", version = "0.18.2" } +sof-types = { path = "../sof-types", version = "0.18.2", features = ["solana-compat"] } bincode = "1.3.3" solana-compute-budget-interface = "3.0.0" solana-keypair = "3.0.1" @@ -30,6 +30,6 @@ solana-transaction = { version = "3.0.2", features = ["bincode"] } thiserror = "2.0" [dev-dependencies] -sof = { path = "../sof-observer", version = "0.18.1", default-features = false } -sof-tx = { path = "../sof-tx", version = "0.18.1", features = ["sof-adapters"] } +sof = { path = "../sof-observer", version = "0.18.2", default-features = false } +sof-tx = { path = "../sof-tx", version = "0.18.2", features = ["sof-adapters"] } tokio = { version = "1.48", features = ["macros", "rt-multi-thread"] } diff --git a/crates/sof-tx/Cargo.toml b/crates/sof-tx/Cargo.toml index d6ec34a6..6c0ace53 100644 --- a/crates/sof-tx/Cargo.toml +++ b/crates/sof-tx/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sof-tx" -version = "0.18.1" +version = "0.18.2" edition.workspace = true description = "SOF transaction SDK for building and submitting Solana transactions" license = "Apache-2.0 OR MIT" @@ -40,8 +40,8 @@ solana-signature = "3.1.0" solana-signer = "3.0.0" solana-system-interface = { version = "3.0.0", features = ["bincode"] } solana-transaction = { version = "3.0.2", features = ["bincode"] } -sof = { version = "0.18.1", path = "../sof-observer", default-features = false, optional = true } -sof-types = { version = "0.18.1", path = "../sof-types", features = ["solana-compat"] } +sof = { version = "0.18.2", path = "../sof-observer", default-features = false, optional = true } +sof-types = { version = "0.18.2", path = "../sof-types", features = ["solana-compat"] } thiserror = "2.0" tokio = { version = "1.48", features = ["macros", "rt-multi-thread", "net", "sync", "time"] } tonic = { version = "0.12", optional = true, default-features = false, features = ["codegen", "prost", "transport", "tls-webpki-roots"] } diff --git a/crates/sof-tx/README.md b/crates/sof-tx/README.md index ee29c86f..0f979efb 100644 --- a/crates/sof-tx/README.md +++ b/crates/sof-tx/README.md @@ -43,20 +43,20 @@ cargo add sof-tx Enable SOF runtime adapters when you want provider values from live `sof` plugin events: ```toml -sof-tx = { version = "0.18.1", features = ["sof-adapters"] } +sof-tx = { version = "0.18.2", features = ["sof-adapters"] } ``` Enable `kernel-bypass` transport hooks for kernel-bypass direct submit integrations: ```toml -sof-tx = { version = "0.18.1", features = ["kernel-bypass"] } +sof-tx = { version = "0.18.2", features = ["kernel-bypass"] } ``` Use `sof-solana-compat` when you want the Solana-native `TxBuilder` plus unsigned convenience submission helpers on top of `sof-tx`: ```toml -sof-solana-compat = "0.18.1" +sof-solana-compat = "0.18.2" ``` ## Quick Start diff --git a/crates/sof-types/Cargo.toml b/crates/sof-types/Cargo.toml index 1b5561de..e68be189 100644 --- a/crates/sof-types/Cargo.toml +++ b/crates/sof-types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sof-types" -version = "0.18.1" +version = "0.18.2" edition.workspace = true description = "Stable SOF-owned primitive types shared across SOF crates" license = "Apache-2.0 OR MIT" diff --git a/docs/architecture/runtime-extension-hooks.md b/docs/architecture/runtime-extension-hooks.md index 08f8ed0b..934c4a9f 100644 --- a/docs/architecture/runtime-extension-hooks.md +++ b/docs/architecture/runtime-extension-hooks.md @@ -69,12 +69,20 @@ Startup manifests can request runtime-managed resources: 3. `TcpConnector` 4. `WsConnector` +Manifest validation rules: + +1. extension names must be non-empty, +2. `resource_id` must be non-empty, +3. `Shared { tag }` tags must be non-empty, +4. `read_buffer_bytes` is bounded by runtime startup validation. + `WsConnector` supports full WebSocket protocol handling: 1. `ws://` and `wss://` URLs, 2. opening handshake, 3. decoded message frame delivery to extension dispatch, -4. `Ping` / `Pong` handling. +4. `Ping` / `Pong` handling, +5. bounded frame/message limits derived from `read_buffer_bytes`. ## Visibility and Sharing diff --git a/docs/gitbook/crates/sof-tx.md b/docs/gitbook/crates/sof-tx.md index 377f0830..f7f3f601 100644 --- a/docs/gitbook/crates/sof-tx.md +++ b/docs/gitbook/crates/sof-tx.md @@ -309,9 +309,9 @@ If the conceptual docs stop too early for what you need to build, open these nex ## Feature Flags ```toml -sof-tx = { version = "0.18.1", features = ["sof-adapters"] } -sof-tx = { version = "0.18.1", features = ["kernel-bypass"] } -sof-tx = { version = "0.18.1", features = ["jito-grpc"] } +sof-tx = { version = "0.18.2", features = ["sof-adapters"] } +sof-tx = { version = "0.18.2", features = ["kernel-bypass"] } +sof-tx = { version = "0.18.2", features = ["jito-grpc"] } ``` ## Good Fit diff --git a/docs/gitbook/getting-started/install-sof.md b/docs/gitbook/getting-started/install-sof.md index 3a15d21f..87506ba1 100644 --- a/docs/gitbook/getting-started/install-sof.md +++ b/docs/gitbook/getting-started/install-sof.md @@ -35,8 +35,8 @@ Only add `sof-gossip-tuning` if you are embedding `sof` and want typed host/runt Common feature combinations: ```toml -sof = { version = "0.18.1", features = ["gossip-bootstrap"] } -sof-tx = { version = "0.18.1", features = ["sof-adapters"] } +sof = { version = "0.18.2", features = ["gossip-bootstrap"] } +sof-tx = { version = "0.18.2", features = ["sof-adapters"] } ``` ## Choose Your Starting Point @@ -50,7 +50,7 @@ Start with the app shape that matches what you need to build right now. ```toml [dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -sof = "0.18.1" +sof = "0.18.2" ``` `src/main.rs`: @@ -74,7 +74,7 @@ Use this when you need ingest, plugin events, datasets, or local control-plane s [dependencies] async-trait = "0.1" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -sof = "0.18.1" +sof = "0.18.2" tracing = "0.1" ``` @@ -124,7 +124,7 @@ Use this when you already know you want to consume SOF events in your own code. ```toml [dependencies] -sof-tx = "0.18.1" +sof-tx = "0.18.2" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } ```