diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 45dab360..665eaf6a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,6 +27,31 @@ jobs: - name: Build and verify publishable crate archives run: bash scripts/verify-publishable-archives.sh + typescript-sdk: + runs-on: ubuntu-latest + timeout-minutes: 20 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup pnpm + uses: pnpm/action-setup@v4 + with: + version: 10.28.2 + + - name: Setup Node + uses: actions/setup-node@v4 + with: + node-version: "22" + cache: pnpm + cache-dependency-path: pnpm-lock.yaml + + - name: Install SDK dependencies + run: pnpm install --frozen-lockfile + + - name: Run TypeScript SDK checks + run: pnpm --dir sdks/typescript run check + fuzz-smoke: runs-on: ubuntu-latest timeout-minutes: 90 diff --git a/.github/workflows/release-typescript-sdk.yml b/.github/workflows/release-typescript-sdk.yml new file mode 100644 index 00000000..96bdac40 --- /dev/null +++ b/.github/workflows/release-typescript-sdk.yml @@ -0,0 +1,230 @@ +name: Release TypeScript SDK + +on: + push: + tags: + - "ts-sdk-v*.*.*" + workflow_dispatch: + inputs: + publish: + description: "Publish to npm instead of packing artifacts only" + required: false + default: false + type: boolean + +permissions: + contents: read + id-token: write + +concurrency: + group: release-typescript-sdk-${{ github.ref }} + cancel-in-progress: false + +env: + NODE_VERSION: "22" + SDK_DIRECTORY: sdks/typescript + +jobs: + verify: + runs-on: ubuntu-latest + timeout-minutes: 30 + outputs: + sdk_version: ${{ steps.sdk-version.outputs.value }} + publish_release: ${{ steps.release-mode.outputs.value }} + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Setup pnpm + uses: pnpm/action-setup@v4 + with: + version: 10.28.2 + + - name: Setup Node + uses: actions/setup-node@v4 + with: + node-version: ${{ env.NODE_VERSION }} + cache: pnpm + cache-dependency-path: pnpm-lock.yaml + registry-url: https://registry.npmjs.org + + - name: Decide release mode + id: release-mode + shell: bash + run: | + set -euo pipefail + if [[ "${GITHUB_EVENT_NAME}" == "push" ]]; then + echo "value=true" >> "${GITHUB_OUTPUT}" + elif [[ "${{ inputs.publish }}" == "true" ]]; then + echo "value=true" >> "${GITHUB_OUTPUT}" + else + echo "value=false" >> "${GITHUB_OUTPUT}" + fi + + - name: Read SDK version + id: sdk-version + shell: bash + run: | + set -euo pipefail + value="$(node -p "require('./${SDK_DIRECTORY}/package.json').version")" + echo "value=${value}" >> "${GITHUB_OUTPUT}" + + - name: Verify tagged commit is on main + if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') + shell: bash + run: | + set -euo pipefail + git fetch --no-tags origin main + if ! git merge-base --is-ancestor "${GITHUB_SHA}" "FETCH_HEAD"; then + echo "Tag ${GITHUB_REF_NAME} points to commit ${GITHUB_SHA} which is not on origin/main" + exit 1 + fi + + - name: Verify tag matches SDK version + if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') + shell: bash + run: | + set -euo pipefail + expected_tag="ts-sdk-v${{ steps.sdk-version.outputs.value }}" + if [[ "${GITHUB_REF_NAME}" != "${expected_tag}" ]]; then + echo "Expected tag ${expected_tag}, got ${GITHUB_REF_NAME}" + exit 1 + fi + + - name: Install workspace dependencies + run: pnpm install --frozen-lockfile + + - name: Run TypeScript SDK checks + run: pnpm --dir "${SDK_DIRECTORY}" run check + + publish-native: + needs: verify + runs-on: ${{ matrix.runner }} + timeout-minutes: 60 + strategy: + fail-fast: false + matrix: + include: + - package_dir: sdks/typescript/native/linux-x64 + package_name: "@lythaeon-sof/sdk-native-linux-x64" + runner: ubuntu-24.04 + - package_dir: sdks/typescript/native/linux-arm64 + package_name: "@lythaeon-sof/sdk-native-linux-arm64" + runner: ubuntu-24.04-arm + - package_dir: sdks/typescript/native/darwin-x64 + package_name: "@lythaeon-sof/sdk-native-darwin-x64" + runner: macos-15-intel + - package_dir: sdks/typescript/native/darwin-arm64 + package_name: "@lythaeon-sof/sdk-native-darwin-arm64" + runner: macos-15 + - package_dir: sdks/typescript/native/win32-x64 + package_name: "@lythaeon-sof/sdk-native-win32-x64" + runner: windows-2025 + - package_dir: sdks/typescript/native/win32-arm64 + package_name: "@lythaeon-sof/sdk-native-win32-arm64" + runner: windows-11-arm + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Rust CI toolchain and tools + uses: ./.github/actions/rust-ci-setup + + - name: Setup pnpm + uses: pnpm/action-setup@v4 + with: + version: 10.28.2 + + - name: Setup Node + uses: actions/setup-node@v4 + with: + node-version: ${{ env.NODE_VERSION }} + cache: pnpm + cache-dependency-path: pnpm-lock.yaml + registry-url: https://registry.npmjs.org + + - name: Install workspace dependencies + run: pnpm install --frozen-lockfile + + - name: Publish native runtime package + if: needs.verify.outputs.publish_release == 'true' + working-directory: ${{ matrix.package_dir }} + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} + shell: bash + run: pnpm publish --access public --no-git-checks + + - name: Pack native runtime package + if: needs.verify.outputs.publish_release != 'true' + working-directory: ${{ matrix.package_dir }} + shell: bash + run: pnpm pack --pack-destination ../../../dist/npm + + publish-sdk: + needs: + - verify + - publish-native + runs-on: ubuntu-latest + timeout-minutes: 30 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup pnpm + uses: pnpm/action-setup@v4 + with: + version: 10.28.2 + + - name: Setup Node + uses: actions/setup-node@v4 + with: + node-version: ${{ env.NODE_VERSION }} + cache: pnpm + cache-dependency-path: pnpm-lock.yaml + registry-url: https://registry.npmjs.org + + - name: Install workspace dependencies + run: pnpm install --frozen-lockfile + + - name: Wait for native packages to index on npm + if: needs.verify.outputs.publish_release == 'true' + shell: bash + run: | + set -euo pipefail + version="${{ needs.verify.outputs.sdk_version }}" + packages="$(node -p "JSON.stringify(Object.keys(require('./${SDK_DIRECTORY}/package.json').optionalDependencies).sort())")" + + wait_until_indexed() { + local pkg="$1" + local attempt + for attempt in $(seq 1 30); do + if npm view "${pkg}@${version}" version >/dev/null 2>&1; then + echo ">>> ${pkg}@${version} is visible on npm" + return 0 + fi + echo ">>> Waiting for ${pkg}@${version} to appear on npm (${attempt}/30)..." + sleep 10 + done + echo ">>> ${pkg}@${version} did not appear on npm in time" + return 1 + } + + node -e "for (const pkg of ${packages}) console.log(pkg)" | while IFS= read -r package_name; do + wait_until_indexed "${package_name}" + done + + - name: Publish TypeScript SDK + if: needs.verify.outputs.publish_release == 'true' + working-directory: ${{ env.SDK_DIRECTORY }} + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} + shell: bash + run: pnpm publish --access public --no-git-checks + + - name: Pack TypeScript SDK + if: needs.verify.outputs.publish_release != 'true' + working-directory: ${{ env.SDK_DIRECTORY }} + shell: bash + run: pnpm pack --pack-destination ../dist/npm diff --git a/.gitignore b/.gitignore index 8d0d085f..62d74441 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,9 @@ /target/ +**/node_modules/ +**/dist/ +**/dist-test/ +**/dist-examples/ +sdks/typescript/native/*/vendor/ # Editor/OS noise .DS_Store diff --git a/crates/sof-observer/Cargo.toml b/crates/sof-observer/Cargo.toml index 28cc7457..e33bbc40 100644 --- a/crates/sof-observer/Cargo.toml +++ b/crates/sof-observer/Cargo.toml @@ -56,7 +56,7 @@ solana-vote = "3.1.11" solana-vote-program = { version = "3.1.11", features = ["agave-unstable-api"] } reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] } thiserror = "2.0" -tokio = { version = "1.48", features = ["io-util", "macros", "rt-multi-thread", "net", "sync", "time"] } +tokio = { version = "1.48", features = ["io-util", "macros", "net", "process", "rt-multi-thread", "sync", "time"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } wincode = "=0.1.2" @@ -167,6 +167,10 @@ name = "gossip_protocol_profile" path = "src/bin/gossip_protocol_profile.rs" required-features = ["gossip-bootstrap"] +[[bin]] +name = "sof_ts_runtime_host" +path = "src/bin/sof_ts_runtime_host.rs" + [[bench]] name = "hot_paths" harness = false diff --git a/crates/sof-observer/src/bin/sof_ts_runtime_host.rs b/crates/sof-observer/src/bin/sof_ts_runtime_host.rs new file mode 100644 index 00000000..85f872e8 --- /dev/null +++ b/crates/sof-observer/src/bin/sof_ts_runtime_host.rs @@ -0,0 +1,2707 @@ +//! Native runtime host used by the TypeScript SDK `App.run()` handoff. + +#[cfg(all(target_os = "linux", feature = "kernel-bypass"))] +#[path = "sof_ts_runtime_host/af_xdp.rs"] +mod af_xdp; + +#[cfg(feature = "provider-grpc")] +use std::str::FromStr; +use std::sync::Arc; +#[cfg(all(target_os = "linux", feature = "kernel-bypass"))] +use std::sync::atomic::{AtomicBool, Ordering}; +#[cfg(all(target_os = "linux", feature = "kernel-bypass"))] +use std::time::Duration; +use std::{collections::HashMap, env, fs, net::SocketAddr, path::PathBuf}; + +use async_trait::async_trait; +#[cfg(feature = "provider-grpc")] +use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +use sof::provider_stream::websocket::{ + WebsocketLogsConfig, WebsocketLogsFilter, WebsocketPrimaryStream, + WebsocketTransactionCommitment, WebsocketTransactionConfig, spawn_websocket_logs_source, + spawn_websocket_source, +}; +#[cfg(feature = "provider-grpc")] +use sof::provider_stream::yellowstone::{ + YellowstoneGrpcCommitment, YellowstoneGrpcConfig, YellowstoneGrpcSlotsConfig, + YellowstoneGrpcStream, spawn_yellowstone_grpc_slot_source, spawn_yellowstone_grpc_source, +}; +#[cfg(feature = "provider-grpc")] +use sof::provider_stream::{ + ProviderStreamMode, ProviderStreamSender, create_provider_stream_queue, +}; +#[cfg(feature = "gossip-bootstrap")] +use sof::runtime::GossipRuntimeMode; +#[cfg(feature = "provider-grpc")] +use sof::{ + event::{ForkSlotStatus, TxCommitmentStatus, TxKind}, + framework::{ + AccountUpdateEvent, BlockMetaEvent, ObservedRecentBlockhashEvent, ObserverPlugin, + PluginConfig, PluginContext, PluginHost, PluginSetupError, SlotStatusEvent, + TransactionEvent, TransactionLogEvent, TransactionStatusEvent, + }, + provider_stream::{ + ProviderSourceArbitrationMode, ProviderSourceReadiness, ProviderSourceRef, + ProviderSourceRole, + }, +}; +use sof::{ + framework::{ + ExtensionCapability, ExtensionContext, ExtensionManifest, ExtensionResourceSpec, + ExtensionSetupError, ExtensionStreamVisibility, PacketSubscription, RuntimeExtension, + RuntimeExtensionHost, RuntimePacketEvent, RuntimePacketEventClass, RuntimePacketSource, + RuntimePacketSourceKind, RuntimePacketTransport, RuntimeWebSocketFrameType, + TcpConnectorSpec, TcpListenerSpec, UdpListenerSpec, WsConnectorSpec, + }, + runtime::{ObserverRuntime, RuntimeError, RuntimeSetup}, +}; +#[cfg(feature = "provider-grpc")] +use solana_pubkey::Pubkey; +#[cfg(feature = "provider-grpc")] +use solana_signature::Signature; +#[cfg(feature = "provider-grpc")] +use tokio::task::JoinHandle; +#[cfg(all(target_os = "linux", feature = "kernel-bypass"))] +use tokio::task::spawn_blocking; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufReader}, + process::{Child, ChildStdin, ChildStdout, Command}, + sync::{Mutex, mpsc, oneshot}, +}; + +/// Wire tag for websocket ingress handoff. +const INGRESS_KIND_WEB_SOCKET: u8 = 1; +/// Wire tag for Yellowstone gRPC ingress handoff. +const INGRESS_KIND_GRPC: u8 = 2; +/// Wire tag for gossip bootstrap ingress handoff. +const INGRESS_KIND_GOSSIP: u8 = 3; +/// Wire tag for direct raw-shred ingress handoff. +const INGRESS_KIND_DIRECT_SHREDS: u8 = 4; +#[cfg(feature = "provider-grpc")] +/// Wire tag for Yellowstone transaction stream selection. +const GRPC_STREAM_TRANSACTIONS: u8 = 1; +#[cfg(feature = "provider-grpc")] +/// Wire tag for Yellowstone transaction-status stream selection. +const GRPC_STREAM_TRANSACTION_STATUS: u8 = 2; +#[cfg(feature = "provider-grpc")] +/// Wire tag for Yellowstone account stream selection. +const GRPC_STREAM_ACCOUNTS: u8 = 3; +#[cfg(feature = "provider-grpc")] +/// Wire tag for Yellowstone block-meta stream selection. +const GRPC_STREAM_BLOCK_META: u8 = 4; +#[cfg(feature = "provider-grpc")] +/// Wire tag for Yellowstone slot stream selection. +const GRPC_STREAM_SLOTS: u8 = 5; +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Wire tag for websocket `transactionSubscribe`. +const WEBSOCKET_STREAM_TRANSACTIONS: u8 = 1; +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Wire tag for websocket `logsSubscribe`. +const WEBSOCKET_STREAM_LOGS: u8 = 2; +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Wire tag for websocket `accountSubscribe`. +const WEBSOCKET_STREAM_ACCOUNT: u8 = 3; +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Wire tag for websocket `programSubscribe`. +const WEBSOCKET_STREAM_PROGRAM: u8 = 4; +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Wire tag for websocket logs `all` filter. +const WEBSOCKET_LOGS_FILTER_ALL: u8 = 1; +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Wire tag for websocket logs `allWithVotes` filter. +const WEBSOCKET_LOGS_FILTER_ALL_WITH_VOTES: u8 = 2; +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Wire tag for websocket logs `mentions` filter. +const WEBSOCKET_LOGS_FILTER_MENTIONS: u8 = 3; +#[cfg(feature = "provider-grpc")] +/// Provider-stream channel capacity used by the native host. +const PROVIDER_STREAM_QUEUE_CAPACITY: usize = 4096; +/// Worker response tag for manifest delivery. +const RESPONSE_TAG_MANIFEST: u8 = 1; +/// Worker response tag for startup acknowledgement. +const RESPONSE_TAG_STARTED: u8 = 2; +/// Worker response tag for shutdown acknowledgement. +const RESPONSE_TAG_SHUTDOWN_COMPLETE: u8 = 4; +/// Successful worker result tag. +const RESULT_TAG_OK: u8 = 1; +/// Error worker result tag. +const RESULT_TAG_ERR: u8 = 2; +/// Framed worker protocol payload size prefix width in bytes. +const WORKER_FRAME_HEADER_LEN: usize = 5; +/// Maximum number of events coalesced into one batch frame. +const WORKER_BATCH_MAX_EVENTS: usize = 64; +/// Bounded worker command queue capacity before runtime callbacks apply backpressure. +const WORKER_COMMAND_QUEUE_CAPACITY: usize = 4096; +/// Frame tag for manifest request. +const WORKER_FRAME_GET_MANIFEST: u8 = 1; +/// Frame tag for startup request. +const WORKER_FRAME_START: u8 = 2; +/// Frame tag for packet batch delivery. +const WORKER_FRAME_PACKET_BATCH: u8 = 3; +/// Frame tag for shutdown request. +const WORKER_FRAME_SHUTDOWN: u8 = 4; +#[cfg(feature = "provider-grpc")] +/// Frame tag for provider-event batch delivery. +const WORKER_FRAME_PROVIDER_BATCH: u8 = 5; + +/// Errors returned by the native TypeScript runtime host. +#[derive(Debug, thiserror::Error)] +enum HostError { + /// No config path argument was supplied on the command line. + #[error("usage: sof_ts_runtime_host ")] + MissingConfigPath, + /// Reading the host config file failed. + #[error("failed to read runtime host config {path}: {source}")] + ReadConfig { + /// Path that failed to load. + path: PathBuf, + /// Underlying I/O failure. + source: std::io::Error, + }, + /// Parsing the host config JSON failed. + #[error("failed to parse runtime host config {path}: {source}")] + ParseConfig { + /// Path that failed to parse. + path: PathBuf, + /// Underlying JSON parse failure. + source: serde_json::Error, + }, + /// The provided config is structurally valid JSON but semantically invalid. + #[error("{0}")] + InvalidConfig(String), + #[cfg(all(target_os = "linux", feature = "kernel-bypass"))] + /// The AF_XDP kernel-bypass producer failed. + #[error("kernel-bypass producer failed: {0}")] + KernelBypass(String), + /// The observer runtime failed while running. + #[error("runtime failed: {0}")] + Runtime(#[from] RuntimeError), +} + +/// Top-level JSON payload handed from the TypeScript SDK into the native host. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct RuntimeHostConfig { + /// Stable app name used for environment and observability. + app_name: String, + /// Runtime environment variables derived from the TypeScript runtime config. + runtime_environment: HashMap, + /// Ingress sources delegated to the native host. + ingress: Vec, + #[cfg(feature = "provider-grpc")] + /// Provider fan-in policy for multi-source provider ingress. + fan_in: Option, + /// Plugin workers launched through the stdio worker bridge. + plugin_workers: Vec, +} + +#[cfg(feature = "provider-grpc")] +/// JSON wire representation of provider fan-in arbitration. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct FanInConfig { + /// Arbitration strategy selected by the TypeScript SDK. + strategy: u8, +} + +/// JSON wire representation for one ingress source. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct IngressConfig { + /// Ingress kind discriminator. + kind: u8, + /// Stable ingress instance name. + name: String, + /// Optional bind address for raw ingress runtime setup. + bind_address: Option, + #[cfg(feature = "provider-grpc")] + /// Provider endpoint URL. + endpoint: Option, + #[cfg(feature = "provider-grpc")] + /// Provider stream family discriminator. + stream: Option, + #[cfg(feature = "provider-grpc")] + /// Optional Yellowstone x-token. + x_token: Option, + #[cfg(feature = "provider-grpc")] + /// Optional provider commitment policy. + commitment: Option, + #[cfg(feature = "provider-grpc")] + /// Optional Yellowstone vote filter. + vote: Option, + #[cfg(feature = "provider-grpc")] + /// Optional Yellowstone failed filter. + failed: Option, + #[cfg(feature = "provider-grpc")] + /// Optional signature filter. + signature: Option, + #[cfg(feature = "provider-grpc")] + /// Account-include filter list. + account_include: Option>, + #[cfg(feature = "provider-grpc")] + /// Account-exclude filter list. + account_exclude: Option>, + #[cfg(feature = "provider-grpc")] + /// Account-required filter list. + account_required: Option>, + #[cfg(feature = "provider-grpc")] + /// Explicit account selector list. + accounts: Option>, + #[cfg(feature = "provider-grpc")] + /// Explicit owner selector list. + owners: Option>, + #[cfg(feature = "provider-grpc")] + /// Whether a transaction signature must be present before dispatch. + require_transaction_signature: Option, + #[cfg(feature = "provider-grpc")] + /// Source readiness policy inside provider fan-in. + readiness: Option, + #[cfg(feature = "provider-grpc")] + /// Source role inside provider fan-in. + role: Option, + #[cfg(feature = "provider-grpc")] + /// Explicit source priority inside provider fan-in. + priority: Option, + /// Websocket URL for SDK-side validation errors. + url: Option, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + /// Account pubkey for `accountSubscribe`. + account: Option, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + /// Program pubkey for `programSubscribe`. + program_id: Option, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + /// Logs filter selector for `logsSubscribe`. + logs_filter: Option, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + /// Mention pubkey for websocket logs mention filters. + mentions: Option, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + /// Legacy custom JSON-RPC subscribe messages rejected by the native host. + requests: Option>, + /// Gossip bootstrap entrypoints. + entrypoints: Option>, + #[cfg(feature = "gossip-bootstrap")] + /// Gossip runtime mode selector. + runtime_mode: Option, + /// Whether the active gossip entrypoint is pinned. + entrypoint_pinned: Option, + /// Optional kernel-bypass receive configuration. + kernel_bypass: Option, +} + +/// JSON wire representation of direct AF_XDP ingest settings. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct KernelBypassConfig { + /// Target interface name. + interface: String, + /// Receive queue id. + queue_id: u32, + /// Packet batch size. + batch_size: usize, + /// Number of UMEM frames to allocate. + umem_frame_count: u32, + /// RX/fill/completion ring depth. + ring_depth: u32, + /// Poll timeout in milliseconds. + poll_timeout_ms: u64, +} + +/// JSON wire representation of one stdio-backed plugin worker. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PluginWorkerConfig { + /// Stable plugin worker name. + name: String, + /// Worker manifest used to register the extension or plugin bridge. + manifest: RuntimeExtensionWorkerManifestConfig, + /// Executable command used to spawn the worker. + command: String, + /// Command-line arguments passed to the worker. + args: Vec, + /// Environment variables passed to the worker. + environment: HashMap, + #[cfg(feature = "provider-grpc")] + /// Whether the worker expects provider-event callbacks. + provider_events: bool, +} + +/// Worker manifest envelope received from the TypeScript SDK. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct RuntimeExtensionWorkerManifestConfig { + /// Extension name used by the runtime extension host. + extension_name: String, + /// Runtime extension manifest payload. + manifest: ExtensionManifestConfig, +} + +/// Runtime extension manifest payload serialized from TypeScript. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ExtensionManifestConfig { + /// Extension capability discriminators. + capabilities: Vec, + /// Resource declarations owned by the extension. + resources: Vec, + /// Packet subscriptions requested by the extension. + subscriptions: Vec, +} + +/// Extension resource declaration serialized from TypeScript. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ExtensionResourceConfig { + /// Resource kind discriminator. + kind: u8, + /// Stable resource id. + resource_id: String, + /// Local bind address for listener resources. + bind_address: Option, + /// Remote address for outbound resources. + remote_address: Option, + /// URL for websocket resources. + url: Option, + /// Stream visibility policy. + visibility: ExtensionStreamVisibilityConfig, + /// Read buffer size in bytes. + read_buffer_bytes: usize, +} + +/// Visibility policy for extension-owned streams. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ExtensionStreamVisibilityConfig { + /// Visibility discriminator. + tag: u8, + /// Optional shared visibility tag. + shared_tag: Option, +} + +/// Packet subscription filter serialized from TypeScript. +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PacketSubscriptionConfig { + /// Packet source kind selector. + source_kind: Option, + /// Transport selector. + transport: Option, + /// Packet event-class selector. + event_class: Option, + /// Local address filter. + local_address: Option, + /// Local port filter. + local_port: Option, + /// Remote address filter. + remote_address: Option, + /// Remote port filter. + remote_port: Option, + /// Owning extension filter. + owner_extension: Option, + /// Resource id filter. + resource_id: Option, + /// Shared stream tag filter. + shared_tag: Option, + /// Websocket frame-type selector. + web_socket_frame_type: Option, +} + +/// Runtime extension adapter that forwards packet events into one TS worker. +struct TypeScriptRuntimeExtension { + /// Shared worker bridge used for lifecycle and packet delivery. + bridge: Arc, +} + +#[cfg(feature = "provider-grpc")] +/// Observer plugin adapter that forwards provider events into one TS worker. +struct TypeScriptObserverPlugin { + /// Shared worker bridge used for lifecycle and provider-event delivery. + bridge: Arc, + /// Hook contract derived from the active provider ingress modes. + config: PluginConfig, +} + +/// Shared worker bridge for one TypeScript plugin process. +struct TypeScriptWorkerBridge { + /// Extension/plugin name registered with the runtime. + name: &'static str, + /// Worker launch configuration. + config: PluginWorkerConfig, + /// Live worker command bridge. + process: Mutex>, +} + +/// Shared worker command sender and background task. +struct TypeScriptWorkerHandle { + /// Command sender used by packet and provider callbacks. + commands: mpsc::Sender, + /// Background task driving framed worker I/O. + task: tokio::task::JoinHandle<()>, +} + +/// Spawned stdio worker process state. +struct TypeScriptWorkerProcess { + /// Child process handle. + child: Child, + /// Worker stdin used for protocol messages. + stdin: ChildStdin, + /// Framed worker stdout reader. + stdout: BufReader, +} + +/// Command sent into the background worker bridge task. +enum TypeScriptWorkerCommand { + /// Deliver one packet event. + Packet(RuntimePacketEvent), + #[cfg(feature = "provider-grpc")] + /// Deliver one provider event. + ProviderEvent(Value), + /// Request worker shutdown and wait for process exit. + Shutdown { + /// Completion notifier for shutdown callers. + completed: oneshot::Sender<()>, + }, +} + +/// Lifecycle context passed into the TypeScript worker protocol. +#[derive(Debug, Serialize)] +struct WorkerContext<'name> { + #[serde(rename = "extensionName")] + /// Extension name associated with the lifecycle callback. + extension_name: &'name str, +} + +/// Starts the native TypeScript runtime host. +#[tokio::main] +async fn main() -> Result<(), HostError> { + let config_path = env::args_os() + .nth(1) + .map(PathBuf::from) + .ok_or(HostError::MissingConfigPath)?; + let config_text = fs::read_to_string(&config_path).map_err(|source| HostError::ReadConfig { + path: config_path.clone(), + source, + })?; + let config: RuntimeHostConfig = + serde_json::from_str(&config_text).map_err(|source| HostError::ParseConfig { + path: config_path, + source, + })?; + + run_config(config).await +} + +/// Routes the parsed host config into one unified runtime host composition. +async fn run_config(config: RuntimeHostConfig) -> Result<(), HostError> { + validate_ingress(&config)?; + + let setup = runtime_setup(&config); + let kernel_bypass = direct_shreds_ingress(&config) + .and_then(|ingress| ingress.kernel_bypass.as_ref()) + .cloned(); + let (extension_host, worker_bridges) = build_worker_bridges(&config.plugin_workers)?; + #[cfg(not(feature = "provider-grpc"))] + drop(worker_bridges); + + let runtime = ObserverRuntime::new() + .with_setup(setup) + .with_extension_host(extension_host); + + #[cfg(feature = "provider-grpc")] + let mut runtime = runtime; + + #[cfg(feature = "provider-grpc")] + let mut provider_source_handles = Vec::new(); + + #[cfg(feature = "provider-grpc")] + if config + .ingress + .iter() + .any(|ingress| ingress.kind == INGRESS_KIND_GRPC || ingress.kind == INGRESS_KIND_WEB_SOCKET) + { + let (provider_stream_tx, provider_stream_rx) = + create_provider_stream_queue(PROVIDER_STREAM_QUEUE_CAPACITY); + let mut modes = Vec::new(); + for ingress in config + .ingress + .iter() + .filter(|ingress| ingress.kind == INGRESS_KIND_GRPC) + { + let source = spawn_yellowstone_ingress( + ingress, + config.fan_in.as_ref(), + provider_stream_tx.clone(), + ) + .await?; + modes.push(source.mode); + provider_source_handles.push(source); + } + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + for ingress in config + .ingress + .iter() + .filter(|ingress| ingress.kind == INGRESS_KIND_WEB_SOCKET) + { + let source = spawn_websocket_provider_ingress( + ingress, + config.fan_in.as_ref(), + provider_stream_tx.clone(), + ) + .await?; + modes.push(source.mode); + provider_source_handles.push(source); + } + #[cfg(not(feature = "provider-websocket"))] + if config + .ingress + .iter() + .any(|ingress| ingress.kind == INGRESS_KIND_WEB_SOCKET) + { + let ingress = config + .ingress + .iter() + .find(|ingress| ingress.kind == INGRESS_KIND_WEB_SOCKET) + .map(|ingress| ingress.name.as_str()) + .unwrap_or("unknown"); + return Err(HostError::InvalidConfig(format!( + "websocket ingress `{ingress}` requires a runtime host built with the provider-websocket feature" + ))); + } + drop(provider_stream_tx); + + runtime = runtime + .with_plugin_host(plugin_host_from_worker_bridges(&worker_bridges, &modes)) + .with_provider_stream_ingress( + provider_stream_mode(modes.as_slice()), + provider_stream_rx, + ); + if has_raw_ingress(&config) { + runtime = runtime.with_raw_ingress_alongside_provider_stream(); + } + } + + #[cfg(not(feature = "provider-grpc"))] + if config + .ingress + .iter() + .any(|ingress| ingress.kind == INGRESS_KIND_GRPC || ingress.kind == INGRESS_KIND_WEB_SOCKET) + { + let ingress = config + .ingress + .iter() + .find(|ingress| { + ingress.kind == INGRESS_KIND_GRPC || ingress.kind == INGRESS_KIND_WEB_SOCKET + }) + .map(|ingress| (ingress.name.as_str(), ingress.kind)) + .unwrap_or(("unknown", INGRESS_KIND_GRPC)); + if ingress.1 == INGRESS_KIND_WEB_SOCKET { + return Err(HostError::InvalidConfig(format!( + "websocket ingress `{}` requires a runtime host built with the provider-grpc and provider-websocket features", + ingress.0 + ))); + } + return Err(HostError::InvalidConfig(format!( + "gRPC ingress `{}` requires a runtime host built with the provider-grpc feature", + ingress.0 + ))); + } + + let run_result = run_runtime_with_optional_kernel_bypass(runtime, kernel_bypass.as_ref()).await; + + #[cfg(feature = "provider-grpc")] + for handle in provider_source_handles { + handle.abort(); + } + + run_result +} + +/// Builds the extension host plus one shared worker bridge per TypeScript plugin. +fn build_worker_bridges( + workers: &[PluginWorkerConfig], +) -> Result<(RuntimeExtensionHost, Vec>), HostError> { + let mut extension_host_builder = RuntimeExtensionHost::builder(); + let mut bridges = Vec::with_capacity(workers.len()); + + for worker in workers.iter().cloned() { + let extension_name = leak_extension_name(worker.manifest.extension_name.clone())?; + let bridge = Arc::new(TypeScriptWorkerBridge { + name: extension_name, + config: worker, + process: Mutex::new(None), + }); + extension_host_builder = extension_host_builder.add_extension(TypeScriptRuntimeExtension { + bridge: Arc::clone(&bridge), + }); + bridges.push(bridge); + } + + Ok((extension_host_builder.build(), bridges)) +} + +#[cfg(feature = "provider-grpc")] +/// Builds a plugin host that reuses the shared TypeScript worker bridges. +fn plugin_host_from_worker_bridges( + bridges: &[Arc], + modes: &[ProviderStreamMode], +) -> PluginHost { + let mut plugin_host_builder = PluginHost::builder(); + for bridge in bridges { + if !bridge.config.provider_events { + continue; + } + + plugin_host_builder = plugin_host_builder.add_plugin(TypeScriptObserverPlugin { + bridge: Arc::clone(bridge), + config: provider_plugin_config(modes), + }); + } + + plugin_host_builder.build() +} + +#[cfg(all(target_os = "linux", feature = "kernel-bypass"))] +/// Runs one observer runtime and optionally attaches AF_XDP ingest until termination. +async fn run_runtime_with_optional_kernel_bypass( + runtime: ObserverRuntime, + kernel_bypass: Option<&KernelBypassConfig>, +) -> Result<(), HostError> { + let Some(kernel_bypass) = kernel_bypass else { + runtime.run_until_termination_signal().await?; + return Ok(()); + }; + + let producer_config = af_xdp::AfXdpConfig { + interface: kernel_bypass.interface.clone(), + queue_id: kernel_bypass.queue_id, + batch_size: kernel_bypass.batch_size, + umem_frame_count: kernel_bypass.umem_frame_count, + ring_depth: kernel_bypass.ring_depth, + poll_timeout: Duration::from_millis(kernel_bypass.poll_timeout_ms), + filter: af_xdp::PortFilter::default_sol(), + }; + let stop = Arc::new(AtomicBool::new(false)); + let producer_stop = Arc::clone(&stop); + let (tx, rx) = sof::runtime::create_kernel_bypass_ingress_queue(); + let producer_task = spawn_blocking(move || { + af_xdp::run_af_xdp_producer_until(&tx, &producer_config, &producer_stop) + }); + let runtime_result = runtime + .with_kernel_bypass_ingress(rx) + .run_until_termination_signal() + .await; + + stop.store(true, Ordering::Relaxed); + let producer_result: Result<(), af_xdp::AfXdpError> = producer_task.await.map_err(|error| { + HostError::KernelBypass(format!("AF_XDP producer task join failed: {error}")) + })?; + producer_result.map_err(|error| HostError::KernelBypass(error.to_string()))?; + runtime_result?; + Ok(()) +} + +#[cfg(not(all(target_os = "linux", feature = "kernel-bypass")))] +/// Runs one observer runtime when kernel bypass is unavailable. +async fn run_runtime_with_optional_kernel_bypass( + runtime: ObserverRuntime, + kernel_bypass: Option<&KernelBypassConfig>, +) -> Result<(), HostError> { + if kernel_bypass.is_some() { + return Err(HostError::InvalidConfig( + "kernel bypass requires a Linux runtime host built with the kernel-bypass feature" + .to_owned(), + )); + } + + runtime.run_until_termination_signal().await?; + Ok(()) +} + +/// Validates ingress combinations accepted by the native host. +fn validate_ingress(config: &RuntimeHostConfig) -> Result<(), HostError> { + let mut direct_shreds_count = 0_usize; + #[cfg(feature = "gossip-bootstrap")] + let mut gossip_count = 0_usize; + #[cfg(not(feature = "gossip-bootstrap"))] + let gossip_count = 0_usize; + for ingress in &config.ingress { + match ingress.kind { + INGRESS_KIND_WEB_SOCKET => match ingress.url.as_deref().map(str::trim) { + Some(url) if !url.is_empty() => {} + _ => { + return Err(HostError::InvalidConfig(format!( + "websocket ingress `{}` is missing a valid url", + ingress.name + ))); + } + }, + INGRESS_KIND_GRPC => {} + INGRESS_KIND_GOSSIP => { + #[cfg(not(feature = "gossip-bootstrap"))] + { + return Err(HostError::InvalidConfig(format!( + "gossip ingress `{}` requires a runtime host built with the gossip-bootstrap feature", + ingress.name + ))); + } + #[cfg(feature = "gossip-bootstrap")] + { + gossip_count = gossip_count.checked_add(1).ok_or_else(|| { + HostError::InvalidConfig( + "gossip ingress count overflowed during validation".to_owned(), + ) + })?; + if ingress.kernel_bypass.is_some() { + return Err(HostError::InvalidConfig(format!( + "gossip ingress `{}` cannot declare kernel bypass directly; attach kernel-bypass config to direct shred ingress instead", + ingress.name + ))); + } + if let Some(runtime_mode) = ingress.runtime_mode + && gossip_runtime_mode_from_wire(runtime_mode).is_none() + { + return Err(HostError::InvalidConfig(format!( + "unsupported gossip runtime mode {runtime_mode}" + ))); + } + } + } + INGRESS_KIND_DIRECT_SHREDS => { + direct_shreds_count = direct_shreds_count.checked_add(1).ok_or_else(|| { + HostError::InvalidConfig( + "direct shred ingress count overflowed during validation".to_owned(), + ) + })?; + if let Some(kernel_bypass) = ingress.kernel_bypass.as_ref() { + validate_kernel_bypass_config(kernel_bypass, &ingress.name)?; + #[cfg(not(all(target_os = "linux", feature = "kernel-bypass")))] + return Err(HostError::InvalidConfig(format!( + "ingress `{}` requests kernel bypass; this runtime host must be built on Linux with the kernel-bypass feature", + ingress.name + ))); + } + } + other => { + return Err(HostError::InvalidConfig(format!( + "ingress `{}` has unsupported kind {other}", + ingress.name + ))); + } + } + } + + if direct_shreds_count > 1 { + return Err(HostError::InvalidConfig( + "TypeScript runtime host currently supports one direct shred ingress source per app run" + .to_owned(), + )); + } + if gossip_count > 1 { + return Err(HostError::InvalidConfig( + "TypeScript runtime host currently supports one gossip ingress source per app run" + .to_owned(), + )); + } + if let (Some(bind_address), Some(gossip_bind_address)) = ( + direct_shreds_ingress(config).and_then(|ingress| ingress.bind_address.as_deref()), + gossip_ingress(config).and_then(|ingress| ingress.bind_address.as_deref()), + ) && bind_address != gossip_bind_address + { + return Err(HostError::InvalidConfig(format!( + "direct shred and gossip ingress bind addresses must match when both are configured ({bind_address} != {gossip_bind_address})" + ))); + } + + Ok(()) +} + +/// Validates one kernel-bypass config block after JSON deserialization. +fn validate_kernel_bypass_config( + config: &KernelBypassConfig, + ingress_name: &str, +) -> Result<(), HostError> { + if config.interface.trim().is_empty() { + return Err(HostError::InvalidConfig(format!( + "ingress `{ingress_name}` requests kernel bypass with an empty network interface" + ))); + } + if config.batch_size == 0 { + return Err(HostError::InvalidConfig(format!( + "ingress `{ingress_name}` requests kernel bypass with batchSize=0" + ))); + } + if config.umem_frame_count == 0 { + return Err(HostError::InvalidConfig(format!( + "ingress `{ingress_name}` requests kernel bypass with umemFrameCount=0" + ))); + } + if config.ring_depth == 0 { + return Err(HostError::InvalidConfig(format!( + "ingress `{ingress_name}` requests kernel bypass with ringDepth=0" + ))); + } + if config.poll_timeout_ms == 0 { + return Err(HostError::InvalidConfig(format!( + "ingress `{ingress_name}` requests kernel bypass with pollTimeoutMs=0" + ))); + } + let _queue_id = config.queue_id; + Ok(()) +} + +/// Builds the runtime setup derived from the TypeScript config. +fn runtime_setup(config: &RuntimeHostConfig) -> RuntimeSetup { + let mut setup = RuntimeSetup::new(); + for (key, value) in &config.runtime_environment { + setup = setup.with_env(key, value); + } + if let Some(bind_address) = effective_bind_address(config) { + setup = setup.with_env("SOF_BIND", bind_address); + } + if let Some(ingress) = gossip_ingress(config) + && let Some(entrypoints) = ingress.entrypoints.as_ref() + { + setup = setup.with_gossip_entrypoints(entrypoints.iter().cloned()); + if let Some(pinned) = ingress.entrypoint_pinned { + setup = setup.with_gossip_entrypoint_pinned(pinned); + } + #[cfg(feature = "gossip-bootstrap")] + if let Some(runtime_mode) = ingress.runtime_mode.and_then(gossip_runtime_mode_from_wire) { + setup = setup.with_gossip_runtime_mode(runtime_mode); + } + } + setup.with_env("SOF_TS_APP_NAME", &config.app_name) +} + +#[cfg(feature = "gossip-bootstrap")] +/// Maps the wire gossip runtime mode into the Rust runtime enum. +const fn gossip_runtime_mode_from_wire(value: u8) -> Option { + match value { + 1 => Some(GossipRuntimeMode::Full), + 2 => Some(GossipRuntimeMode::BootstrapOnly), + 3 => Some(GossipRuntimeMode::ControlPlaneOnly), + _ => None, + } +} + +/// Returns the direct-shreds ingress when present. +fn direct_shreds_ingress(config: &RuntimeHostConfig) -> Option<&IngressConfig> { + config + .ingress + .iter() + .find(|ingress| ingress.kind == INGRESS_KIND_DIRECT_SHREDS) +} + +/// Returns the gossip ingress when present. +fn gossip_ingress(config: &RuntimeHostConfig) -> Option<&IngressConfig> { + config + .ingress + .iter() + .find(|ingress| ingress.kind == INGRESS_KIND_GOSSIP) +} + +#[cfg(feature = "provider-grpc")] +/// Returns whether the config declares any raw packet ingress. +fn has_raw_ingress(config: &RuntimeHostConfig) -> bool { + config.ingress.iter().any(|ingress| { + ingress.kind == INGRESS_KIND_DIRECT_SHREDS || ingress.kind == INGRESS_KIND_GOSSIP + }) +} + +/// Returns the bind address that should drive raw runtime setup. +fn effective_bind_address(config: &RuntimeHostConfig) -> Option<&str> { + direct_shreds_ingress(config) + .and_then(|ingress| ingress.bind_address.as_deref()) + .or_else(|| gossip_ingress(config).and_then(|ingress| ingress.bind_address.as_deref())) +} + +#[cfg(feature = "provider-grpc")] +/// Yellowstone source task handle plus the runtime mode it drives. +struct ProviderSourceHandle { + /// Runtime mode inferred from the configured source. + mode: ProviderStreamMode, + /// Abort handle for the underlying provider source task. + abort_handle: tokio::task::AbortHandle, + /// Join guard that logs provider source task failures. + join_guard: JoinHandle<()>, +} + +#[cfg(feature = "provider-grpc")] +impl ProviderSourceHandle { + /// Stops the provider source task and its join guard. + fn abort(self) { + self.abort_handle.abort(); + self.join_guard.abort(); + } +} + +#[cfg(feature = "provider-grpc")] +/// Wraps one provider source task so failures are not silently detached. +fn spawn_provider_source_join_guard( + source_name: &str, + handle: JoinHandle>, +) -> (tokio::task::AbortHandle, JoinHandle<()>) +where + E: std::fmt::Display + Send + 'static, +{ + let abort_handle = handle.abort_handle(); + let source_name = source_name.to_owned(); + let join_guard = tokio::spawn(async move { + match handle.await { + Ok(Ok(())) => { + tracing::warn!(source = source_name, "provider source task ended"); + } + Ok(Err(error)) => { + tracing::warn!(source = source_name, error = %error, "provider source task failed"); + } + Err(error) => { + if !error.is_cancelled() { + tracing::warn!(source = source_name, error = %error, "provider source task join failed"); + } + } + } + }); + (abort_handle, join_guard) +} + +#[cfg(feature = "provider-grpc")] +/// Spawns one Yellowstone ingress source from the wire config. +async fn spawn_yellowstone_ingress( + ingress: &IngressConfig, + fan_in: Option<&FanInConfig>, + sender: ProviderStreamSender, +) -> Result { + let endpoint = ingress.endpoint.as_deref().ok_or_else(|| { + HostError::InvalidConfig(format!( + "gRPC ingress `{}` is missing endpoint", + ingress.name + )) + })?; + let stream = ingress.stream.unwrap_or(GRPC_STREAM_TRANSACTIONS); + + if stream == GRPC_STREAM_SLOTS { + let mut config = + YellowstoneGrpcSlotsConfig::new(endpoint).with_source_instance(ingress.name.clone()); + config = apply_yellowstone_slot_source_policy(config, ingress)?; + config = apply_yellowstone_slot_fan_in(config, fan_in)?; + if let Some(x_token) = non_empty_optional(ingress.x_token.as_deref()) { + config = config.with_x_token(x_token.to_owned()); + } + if let Some(commitment) = ingress.commitment { + config = config.with_commitment(yellowstone_commitment_from_wire(commitment)?); + } + let mode = config.runtime_mode(); + let handle = spawn_yellowstone_grpc_slot_source(config, sender) + .await + .map_err(|error| HostError::InvalidConfig(error.to_string()))?; + let (abort_handle, join_guard) = spawn_provider_source_join_guard(&ingress.name, handle); + return Ok(ProviderSourceHandle { + mode, + abort_handle, + join_guard, + }); + } + + let mut config = YellowstoneGrpcConfig::new(endpoint) + .with_source_instance(ingress.name.clone()) + .with_stream(yellowstone_stream_from_wire(stream)?); + config = apply_yellowstone_source_policy(config, ingress)?; + config = apply_yellowstone_fan_in(config, fan_in)?; + if let Some(x_token) = non_empty_optional(ingress.x_token.as_deref()) { + config = config.with_x_token(x_token.to_owned()); + } + if let Some(commitment) = ingress.commitment { + config = config.with_commitment(yellowstone_commitment_from_wire(commitment)?); + } + if let Some(vote) = ingress.vote { + config = config.with_vote(vote); + } + if let Some(failed) = ingress.failed { + config = config.with_failed(failed); + } + if let Some(signature) = non_empty_optional(ingress.signature.as_deref()) { + config = config.with_signature(parse_signature(signature, "ingress.signature")?); + } + config = config + .with_account_include(parse_pubkeys( + ingress.account_include.as_deref().unwrap_or(&[]), + "ingress.accountInclude", + )?) + .with_account_exclude(parse_pubkeys( + ingress.account_exclude.as_deref().unwrap_or(&[]), + "ingress.accountExclude", + )?) + .with_account_required(parse_pubkeys( + ingress.account_required.as_deref().unwrap_or(&[]), + "ingress.accountRequired", + )?) + .with_accounts(parse_pubkeys( + ingress.accounts.as_deref().unwrap_or(&[]), + "ingress.accounts", + )?) + .with_owners(parse_pubkeys( + ingress.owners.as_deref().unwrap_or(&[]), + "ingress.owners", + )?); + if ingress.require_transaction_signature.unwrap_or(false) { + config = config.require_transaction_signature(); + } + + let mode = config.runtime_mode(); + let handle = spawn_yellowstone_grpc_source(config, sender) + .await + .map_err(|error| HostError::InvalidConfig(error.to_string()))?; + let (abort_handle, join_guard) = spawn_provider_source_join_guard(&ingress.name, handle); + Ok(ProviderSourceHandle { + mode, + abort_handle, + join_guard, + }) +} + +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Spawns one websocket provider-stream source from the wire config. +async fn spawn_websocket_provider_ingress( + ingress: &IngressConfig, + fan_in: Option<&FanInConfig>, + sender: ProviderStreamSender, +) -> Result { + if !ingress.requests.as_deref().unwrap_or(&[]).is_empty() { + return Err(HostError::InvalidConfig(format!( + "websocket ingress `{}` must use the native SOF provider-stream websocket adapter; custom JSON-RPC requests are not accepted by the runtime host", + ingress.name + ))); + } + let endpoint = ingress.url.as_deref().ok_or_else(|| { + HostError::InvalidConfig(format!( + "websocket ingress `{}` is missing url", + ingress.name + )) + })?; + match ingress.stream.unwrap_or(WEBSOCKET_STREAM_TRANSACTIONS) { + WEBSOCKET_STREAM_TRANSACTIONS => { + let mut config = WebsocketTransactionConfig::new(endpoint) + .with_source_instance(ingress.name.clone()); + config = apply_websocket_source_policy(config, ingress)?; + config = apply_websocket_fan_in(config, fan_in)?; + if let Some(commitment) = ingress.commitment { + config = config.with_commitment(websocket_commitment_from_wire(commitment)?); + } + if let Some(vote) = ingress.vote { + config = config.with_vote(vote); + } + if let Some(failed) = ingress.failed { + config = config.with_failed(failed); + } + if let Some(signature) = non_empty_optional(ingress.signature.as_deref()) { + config = config.with_signature(parse_signature(signature, "ingress.signature")?); + } + config = config + .with_account_include(parse_pubkeys( + ingress.account_include.as_deref().unwrap_or(&[]), + "ingress.accountInclude", + )?) + .with_account_exclude(parse_pubkeys( + ingress.account_exclude.as_deref().unwrap_or(&[]), + "ingress.accountExclude", + )?) + .with_account_required(parse_pubkeys( + ingress.account_required.as_deref().unwrap_or(&[]), + "ingress.accountRequired", + )?); + + let mode = config.runtime_mode(); + let handle = spawn_websocket_source(&config, sender) + .await + .map_err(|error| HostError::InvalidConfig(error.to_string()))?; + let (abort_handle, join_guard) = + spawn_provider_source_join_guard(&ingress.name, handle); + Ok(ProviderSourceHandle { + mode, + abort_handle, + join_guard, + }) + } + WEBSOCKET_STREAM_LOGS => { + let mut config = + WebsocketLogsConfig::new(endpoint).with_source_instance(ingress.name.clone()); + config = apply_websocket_logs_source_policy(config, ingress)?; + config = apply_websocket_logs_fan_in(config, fan_in)?; + if let Some(commitment) = ingress.commitment { + config = config.with_commitment(websocket_commitment_from_wire(commitment)?); + } + config = config.with_filter(websocket_logs_filter_from_wire(ingress)?); + + let mode = config.runtime_mode(); + let handle = spawn_websocket_logs_source(&config, sender) + .await + .map_err(|error| HostError::InvalidConfig(error.to_string()))?; + let (abort_handle, join_guard) = + spawn_provider_source_join_guard(&ingress.name, handle); + Ok(ProviderSourceHandle { + mode, + abort_handle, + join_guard, + }) + } + WEBSOCKET_STREAM_ACCOUNT => { + let account = parse_pubkey( + ingress.account.as_deref(), + "ingress.account", + "websocket account stream", + )?; + let mut config = WebsocketTransactionConfig::new(endpoint) + .with_source_instance(ingress.name.clone()) + .with_stream(WebsocketPrimaryStream::Account(account)); + config = apply_websocket_source_policy(config, ingress)?; + config = apply_websocket_fan_in(config, fan_in)?; + if let Some(commitment) = ingress.commitment { + config = config.with_commitment(websocket_commitment_from_wire(commitment)?); + } + + let mode = config.runtime_mode(); + let handle = spawn_websocket_source(&config, sender) + .await + .map_err(|error| HostError::InvalidConfig(error.to_string()))?; + let (abort_handle, join_guard) = + spawn_provider_source_join_guard(&ingress.name, handle); + Ok(ProviderSourceHandle { + mode, + abort_handle, + join_guard, + }) + } + WEBSOCKET_STREAM_PROGRAM => { + let program_id = parse_pubkey( + ingress.program_id.as_deref(), + "ingress.programId", + "websocket program stream", + )?; + let mut config = WebsocketTransactionConfig::new(endpoint) + .with_source_instance(ingress.name.clone()) + .with_stream(WebsocketPrimaryStream::Program(program_id)); + config = apply_websocket_source_policy(config, ingress)?; + config = apply_websocket_fan_in(config, fan_in)?; + if let Some(commitment) = ingress.commitment { + config = config.with_commitment(websocket_commitment_from_wire(commitment)?); + } + + let mode = config.runtime_mode(); + let handle = spawn_websocket_source(&config, sender) + .await + .map_err(|error| HostError::InvalidConfig(error.to_string()))?; + let (abort_handle, join_guard) = + spawn_provider_source_join_guard(&ingress.name, handle); + Ok(ProviderSourceHandle { + mode, + abort_handle, + join_guard, + }) + } + other => Err(HostError::InvalidConfig(format!( + "unsupported websocket stream kind {other}" + ))), + } +} + +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Applies per-source readiness, role, and priority to one websocket config. +fn apply_websocket_source_policy( + mut config: WebsocketTransactionConfig, + ingress: &IngressConfig, +) -> Result { + if let Some(readiness) = ingress.readiness { + config = config.with_readiness(provider_readiness_from_wire(readiness)?); + } + if let Some(role) = ingress.role { + config = config.with_source_role(provider_role_from_wire(role)?); + } + if let Some(priority) = ingress.priority { + config = config.with_source_priority(priority); + } + Ok(config) +} + +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Applies fan-in arbitration to one websocket config. +fn apply_websocket_fan_in( + config: WebsocketTransactionConfig, + fan_in: Option<&FanInConfig>, +) -> Result { + Ok(config.with_source_arbitration(provider_source_arbitration(fan_in)?)) +} + +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Applies per-source readiness, role, and priority to one websocket logs config. +fn apply_websocket_logs_source_policy( + mut config: WebsocketLogsConfig, + ingress: &IngressConfig, +) -> Result { + if let Some(readiness) = ingress.readiness { + config = config.with_readiness(provider_readiness_from_wire(readiness)?); + } + if let Some(role) = ingress.role { + config = config.with_source_role(provider_role_from_wire(role)?); + } + if let Some(priority) = ingress.priority { + config = config.with_source_priority(priority); + } + Ok(config) +} + +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Applies fan-in arbitration to one websocket logs config. +fn apply_websocket_logs_fan_in( + config: WebsocketLogsConfig, + fan_in: Option<&FanInConfig>, +) -> Result { + Ok(config.with_source_arbitration(provider_source_arbitration(fan_in)?)) +} + +#[cfg(feature = "provider-grpc")] +/// Applies per-source readiness, role, and priority to one Yellowstone config. +fn apply_yellowstone_source_policy( + mut config: YellowstoneGrpcConfig, + ingress: &IngressConfig, +) -> Result { + if let Some(readiness) = ingress.readiness { + config = config.with_readiness(provider_readiness_from_wire(readiness)?); + } + if let Some(role) = ingress.role { + config = config.with_source_role(provider_role_from_wire(role)?); + } + if let Some(priority) = ingress.priority { + config = config.with_source_priority(priority); + } + Ok(config) +} + +#[cfg(feature = "provider-grpc")] +/// Applies per-source readiness, role, and priority to one Yellowstone slots config. +fn apply_yellowstone_slot_source_policy( + mut config: YellowstoneGrpcSlotsConfig, + ingress: &IngressConfig, +) -> Result { + if let Some(readiness) = ingress.readiness { + config = config.with_readiness(provider_readiness_from_wire(readiness)?); + } + if let Some(role) = ingress.role { + config = config.with_source_role(provider_role_from_wire(role)?); + } + if let Some(priority) = ingress.priority { + config = config.with_source_priority(priority); + } + Ok(config) +} + +#[cfg(feature = "provider-grpc")] +/// Applies fan-in arbitration to one Yellowstone config. +fn apply_yellowstone_fan_in( + config: YellowstoneGrpcConfig, + fan_in: Option<&FanInConfig>, +) -> Result { + Ok(config.with_source_arbitration(provider_source_arbitration(fan_in)?)) +} + +#[cfg(feature = "provider-grpc")] +/// Applies fan-in arbitration to one Yellowstone slots config. +fn apply_yellowstone_slot_fan_in( + config: YellowstoneGrpcSlotsConfig, + fan_in: Option<&FanInConfig>, +) -> Result { + Ok(config.with_source_arbitration(provider_source_arbitration(fan_in)?)) +} + +#[cfg(feature = "provider-grpc")] +/// Maps the wire fan-in strategy into the Rust arbitration enum. +fn provider_source_arbitration( + fan_in: Option<&FanInConfig>, +) -> Result { + let Some(fan_in) = fan_in else { + return Ok(ProviderSourceArbitrationMode::FirstSeen); + }; + match fan_in.strategy { + 1 => Ok(ProviderSourceArbitrationMode::EmitAll), + 2 => Ok(ProviderSourceArbitrationMode::FirstSeen), + 3 => Ok(ProviderSourceArbitrationMode::FirstSeenThenPromote), + other => Err(HostError::InvalidConfig(format!( + "unsupported fan-in arbitration mode {other}" + ))), + } +} + +#[cfg(feature = "provider-grpc")] +/// Maps the wire provider readiness selector into the Rust enum. +fn provider_readiness_from_wire(value: u8) -> Result { + match value { + 1 => Ok(ProviderSourceReadiness::Required), + 2 => Ok(ProviderSourceReadiness::Optional), + other => Err(HostError::InvalidConfig(format!( + "unsupported provider ingress readiness {other}" + ))), + } +} + +#[cfg(feature = "provider-grpc")] +/// Maps the wire provider role selector into the Rust enum. +fn provider_role_from_wire(value: u8) -> Result { + match value { + 1 => Ok(ProviderSourceRole::Primary), + 2 => Ok(ProviderSourceRole::Secondary), + 3 => Ok(ProviderSourceRole::Fallback), + 4 => Ok(ProviderSourceRole::ConfirmOnly), + other => Err(HostError::InvalidConfig(format!( + "unsupported provider ingress role {other}" + ))), + } +} + +#[cfg(feature = "provider-grpc")] +/// Collapses one or more source runtime modes into the runtime ingress mode. +fn provider_stream_mode(modes: &[ProviderStreamMode]) -> ProviderStreamMode { + match modes { + [mode] => *mode, + _ => ProviderStreamMode::Generic, + } +} + +#[cfg(feature = "provider-grpc")] +/// Derives one observer hook mask from the active provider ingress modes. +fn provider_plugin_config(modes: &[ProviderStreamMode]) -> PluginConfig { + let mut config = PluginConfig::new(); + for mode in modes { + match mode { + ProviderStreamMode::Generic => { + config = config + .with_transaction() + .with_transaction_status() + .with_account_update() + .with_block_meta() + .with_slot_status() + .with_recent_blockhash(); + config.transaction_log = true; + } + ProviderStreamMode::YellowstoneGrpc => { + config = config.with_transaction().with_recent_blockhash(); + } + ProviderStreamMode::YellowstoneGrpcTransactionStatus => { + config = config.with_transaction_status(); + } + ProviderStreamMode::YellowstoneGrpcAccounts => { + config = config.with_account_update(); + } + ProviderStreamMode::YellowstoneGrpcBlockMeta => { + config = config.with_block_meta(); + } + ProviderStreamMode::YellowstoneGrpcSlots => { + config = config.with_slot_status(); + } + ProviderStreamMode::LaserStream => { + config = config.with_transaction().with_recent_blockhash(); + } + ProviderStreamMode::LaserStreamTransactionStatus => { + config = config.with_transaction_status(); + } + ProviderStreamMode::LaserStreamAccounts => { + config = config.with_account_update(); + } + ProviderStreamMode::LaserStreamBlockMeta => { + config = config.with_block_meta(); + } + ProviderStreamMode::LaserStreamSlots => { + config = config.with_slot_status(); + } + #[cfg(feature = "provider-websocket")] + ProviderStreamMode::WebsocketTransaction => { + config = config.with_transaction().with_recent_blockhash(); + } + #[cfg(feature = "provider-websocket")] + ProviderStreamMode::WebsocketLogs => { + config.transaction_log = true; + } + #[cfg(feature = "provider-websocket")] + ProviderStreamMode::WebsocketAccount | ProviderStreamMode::WebsocketProgram => { + config = config.with_account_update(); + } + } + } + + config +} + +#[cfg(feature = "provider-grpc")] +/// Maps the wire Yellowstone stream selector into the Rust enum. +fn yellowstone_stream_from_wire(value: u8) -> Result { + match value { + GRPC_STREAM_TRANSACTIONS => Ok(YellowstoneGrpcStream::Transaction), + GRPC_STREAM_TRANSACTION_STATUS => Ok(YellowstoneGrpcStream::TransactionStatus), + GRPC_STREAM_ACCOUNTS => Ok(YellowstoneGrpcStream::Accounts), + GRPC_STREAM_BLOCK_META => Ok(YellowstoneGrpcStream::BlockMeta), + other => Err(HostError::InvalidConfig(format!( + "unsupported gRPC stream kind {other}" + ))), + } +} + +#[cfg(feature = "provider-grpc")] +/// Maps the wire Yellowstone commitment selector into the Rust enum. +fn yellowstone_commitment_from_wire(value: u8) -> Result { + match value { + 1 => Ok(YellowstoneGrpcCommitment::Processed), + 2 => Ok(YellowstoneGrpcCommitment::Confirmed), + 3 => Ok(YellowstoneGrpcCommitment::Finalized), + other => Err(HostError::InvalidConfig(format!( + "unsupported gRPC commitment {other}" + ))), + } +} + +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Maps the wire websocket commitment selector into the Rust enum. +fn websocket_commitment_from_wire(value: u8) -> Result { + match value { + 1 => Ok(WebsocketTransactionCommitment::Processed), + 2 => Ok(WebsocketTransactionCommitment::Confirmed), + 3 => Ok(WebsocketTransactionCommitment::Finalized), + other => Err(HostError::InvalidConfig(format!( + "unsupported websocket commitment {other}" + ))), + } +} + +#[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] +/// Maps the wire websocket logs filter selector into the Rust enum. +fn websocket_logs_filter_from_wire( + ingress: &IngressConfig, +) -> Result { + match ingress.logs_filter.unwrap_or(WEBSOCKET_LOGS_FILTER_ALL) { + WEBSOCKET_LOGS_FILTER_ALL => Ok(WebsocketLogsFilter::All), + WEBSOCKET_LOGS_FILTER_ALL_WITH_VOTES => Ok(WebsocketLogsFilter::AllWithVotes), + WEBSOCKET_LOGS_FILTER_MENTIONS => Ok(WebsocketLogsFilter::Mentions(parse_pubkey( + ingress.mentions.as_deref(), + "ingress.mentions", + "websocket logs mention filter", + )?)), + other => Err(HostError::InvalidConfig(format!( + "unsupported websocket logs filter {other}" + ))), + } +} + +#[cfg(feature = "provider-grpc")] +/// Trims and filters empty optional string values. +fn non_empty_optional(value: Option<&str>) -> Option<&str> { + value.map(str::trim).filter(|value| !value.is_empty()) +} + +#[cfg(feature = "provider-grpc")] +/// Parses one base58 transaction signature from the wire config. +fn parse_signature(value: &str, field: &str) -> Result { + Signature::from_str(value).map_err(|error| { + HostError::InvalidConfig(format!("{field} is not a valid signature: {error}")) + }) +} + +#[cfg(feature = "provider-grpc")] +/// Parses one required pubkey from the wire config. +fn parse_pubkey(value: Option<&str>, field: &str, context: &str) -> Result { + let value = non_empty_optional(value) + .ok_or_else(|| HostError::InvalidConfig(format!("{field} is required for {context}")))?; + Pubkey::from_str(value).map_err(|error| { + HostError::InvalidConfig(format!("{field} is not a valid pubkey `{value}`: {error}")) + }) +} + +#[cfg(feature = "provider-grpc")] +/// Parses one list of pubkeys from the wire config. +fn parse_pubkeys(values: &[String], field: &str) -> Result, HostError> { + values + .iter() + .map(|value| { + Pubkey::from_str(value).map_err(|error| { + HostError::InvalidConfig(format!( + "{field} contains invalid pubkey `{value}`: {error}" + )) + }) + }) + .collect() +} + +/// Leaks one extension name for the lifetime expected by the host traits. +fn leak_extension_name(name: String) -> Result<&'static str, HostError> { + if name.trim().is_empty() { + return Err(HostError::InvalidConfig( + "plugin worker declares an empty extension name".to_owned(), + )); + } + + Ok(Box::leak(name.into_boxed_str())) +} + +#[async_trait] +impl RuntimeExtension for TypeScriptRuntimeExtension { + fn name(&self) -> &'static str { + self.bridge.name + } + + async fn setup( + &self, + _ctx: ExtensionContext, + ) -> Result { + self.bridge + .ensure_started() + .await + .map_err(ExtensionSetupError::new)?; + extension_manifest_from_config(&self.bridge.config.manifest.manifest) + .map_err(ExtensionSetupError::new) + } + + async fn on_packet_received(&self, event: RuntimePacketEvent) { + self.bridge.deliver_packet(event).await; + } + + async fn shutdown(&self, _ctx: ExtensionContext) { + self.bridge.shutdown().await; + } +} + +#[async_trait] +#[cfg(feature = "provider-grpc")] +impl ObserverPlugin for TypeScriptObserverPlugin { + fn name(&self) -> &'static str { + self.bridge.name + } + + fn config(&self) -> PluginConfig { + PluginConfig { + transaction_log: self.config.transaction_log, + raw_packet: self.config.raw_packet, + shred: self.config.shred, + dataset: self.config.dataset, + transaction: self.config.transaction, + transaction_status: self.config.transaction_status, + transaction_commitment: self.config.transaction_commitment, + transaction_dispatch_mode: self.config.transaction_dispatch_mode, + transaction_batch: self.config.transaction_batch, + transaction_batch_dispatch_mode: self.config.transaction_batch_dispatch_mode, + transaction_view_batch: self.config.transaction_view_batch, + transaction_view_batch_dispatch_mode: self.config.transaction_view_batch_dispatch_mode, + account_touch: self.config.account_touch, + account_update: self.config.account_update, + block_meta: self.config.block_meta, + slot_status: self.config.slot_status, + reorg: self.config.reorg, + recent_blockhash: self.config.recent_blockhash, + cluster_topology: self.config.cluster_topology, + leader_schedule: self.config.leader_schedule, + } + } + + async fn setup(&self, _ctx: PluginContext) -> Result<(), PluginSetupError> { + self.bridge + .ensure_started() + .await + .map_err(PluginSetupError::new) + } + + async fn on_transaction(&self, event: &TransactionEvent) { + self.bridge + .deliver_provider_event(provider_transaction_event_wire(event)) + .await; + } + + async fn on_transaction_log(&self, event: &TransactionLogEvent) { + self.bridge + .deliver_provider_event(provider_transaction_log_event_wire(event)) + .await; + } + + async fn on_transaction_status(&self, event: &TransactionStatusEvent) { + self.bridge + .deliver_provider_event(provider_transaction_status_event_wire(event)) + .await; + } + + async fn on_account_update(&self, event: &AccountUpdateEvent) { + self.bridge + .deliver_provider_event(provider_account_update_event_wire(event)) + .await; + } + + async fn on_block_meta(&self, event: &BlockMetaEvent) { + self.bridge + .deliver_provider_event(provider_block_meta_event_wire(event)) + .await; + } + + async fn on_slot_status(&self, event: SlotStatusEvent) { + self.bridge + .deliver_provider_event(provider_slot_status_event_wire(&event)) + .await; + } + + async fn on_recent_blockhash(&self, event: ObservedRecentBlockhashEvent) { + self.bridge + .deliver_provider_event(provider_recent_blockhash_event_wire(&event)) + .await; + } + + async fn shutdown(&self, _ctx: PluginContext) { + self.bridge.shutdown().await; + } +} + +impl TypeScriptWorkerBridge { + /// Starts the shared worker process when it has not been started yet. + async fn ensure_started(&self) -> Result<(), String> { + let mut guard = self.process.lock().await; + if guard.is_some() { + return Ok(()); + } + + let process = start_worker_process(self.name, &self.config).await?; + let (commands, receiver) = mpsc::channel(WORKER_COMMAND_QUEUE_CAPACITY); + let extension_name = self.name; + let task = tokio::spawn(async move { + run_worker_process_loop(extension_name, process, receiver).await; + }); + *guard = Some(TypeScriptWorkerHandle { commands, task }); + Ok(()) + } + + /// Delivers one packet event into the bound TypeScript worker. + async fn deliver_packet(&self, event: RuntimePacketEvent) { + let commands = { + let guard = self.process.lock().await; + let Some(handle) = guard.as_ref() else { + tracing::warn!( + extension = self.name, + "worker process is not available for packet" + ); + return; + }; + handle.commands.clone() + }; + + if let Err(error) = commands.send(TypeScriptWorkerCommand::Packet(event)).await { + tracing::warn!( + extension = self.name, + error = %error, + "failed to enqueue packet for worker" + ); + let mut guard = self.process.lock().await; + *guard = None; + } + } + + /// Delivers one provider event into the bound TypeScript worker. + #[cfg(feature = "provider-grpc")] + async fn deliver_provider_event(&self, event: Value) { + let commands = { + let guard = self.process.lock().await; + let Some(handle) = guard.as_ref() else { + tracing::warn!( + plugin = self.name, + "worker process is not available for provider event" + ); + return; + }; + handle.commands.clone() + }; + + if let Err(error) = commands + .send(TypeScriptWorkerCommand::ProviderEvent(event)) + .await + { + tracing::warn!( + plugin = self.name, + error = %error, + "failed to enqueue provider event for worker" + ); + let mut guard = self.process.lock().await; + *guard = None; + } + } + + /// Requests shutdown for the worker process when it is still running. + async fn shutdown(&self) { + let mut guard = self.process.lock().await; + let Some(handle) = guard.take() else { + return; + }; + + let (completed_tx, completed_rx) = oneshot::channel(); + if let Err(error) = handle + .commands + .send(TypeScriptWorkerCommand::Shutdown { + completed: completed_tx, + }) + .await + { + tracing::warn!( + extension = self.name, + error = %error, + "failed to request worker shutdown" + ); + } else if let Err(error) = completed_rx.await { + tracing::warn!( + extension = self.name, + error = %error, + "worker shutdown task did not complete cleanly" + ); + } + + if let Err(error) = handle.task.await { + tracing::warn!(extension = self.name, error = %error, "failed to wait for worker process"); + } + } +} + +/// Owns the child process after startup and batches normal event delivery. +async fn run_worker_process_loop( + extension_name: &'static str, + mut process: TypeScriptWorkerProcess, + mut receiver: mpsc::Receiver, +) { + let mut pending = None; + + loop { + let command = match pending.take() { + Some(command) => command, + None => match receiver.recv().await { + Some(command) => command, + None => break, + }, + }; + + match command { + TypeScriptWorkerCommand::Packet(first_event) => { + let mut events = vec![first_event]; + while events.len() < WORKER_BATCH_MAX_EVENTS { + match receiver.try_recv() { + Ok(TypeScriptWorkerCommand::Packet(event)) => events.push(event), + Ok(other) => { + pending = Some(other); + break; + } + Err(_) => break, + } + } + if let Err(error) = send_packet_batch(&mut process, &events).await { + tracing::warn!(extension = extension_name, error = %error, "failed to deliver packet batch to worker"); + break; + } + } + #[cfg(feature = "provider-grpc")] + TypeScriptWorkerCommand::ProviderEvent(first_event) => { + let mut events = vec![first_event]; + while events.len() < WORKER_BATCH_MAX_EVENTS { + match receiver.try_recv() { + Ok(TypeScriptWorkerCommand::ProviderEvent(event)) => events.push(event), + Ok(other) => { + pending = Some(other); + break; + } + Err(_) => break, + } + } + if let Err(error) = send_provider_event_batch(&mut process, &events).await { + tracing::warn!(plugin = extension_name, error = %error, "failed to deliver provider batch to worker"); + break; + } + } + TypeScriptWorkerCommand::Shutdown { completed } => { + if let Err(error) = send_worker_json_frame( + &mut process, + WORKER_FRAME_SHUTDOWN, + &json!({ + "context": WorkerContext { + extension_name, + }, + }), + ) + .await + { + tracing::warn!(extension = extension_name, error = %error, "failed to request worker shutdown"); + } else if let Err(error) = + read_worker_response(&mut process, RESPONSE_TAG_SHUTDOWN_COMPLETE) + .await + .and_then(|response| response_result_ok(extension_name, &response)) + { + tracing::warn!(extension = extension_name, error = %error, "worker shutdown was not acknowledged"); + } + + if let Err(error) = process.child.wait().await { + tracing::warn!(extension = extension_name, error = %error, "failed to wait for worker process"); + } + if completed.send(()).is_err() { + tracing::debug!( + extension = extension_name, + "worker shutdown completion receiver was dropped" + ); + } + break; + } + } + } +} + +/// Spawns one TypeScript worker and completes its manifest and startup handshake. +async fn start_worker_process( + extension_name: &str, + config: &PluginWorkerConfig, +) -> Result { + let mut process = spawn_worker(extension_name, config).await?; + send_worker_json_frame(&mut process, WORKER_FRAME_GET_MANIFEST, &json!({})).await?; + let manifest_response = read_worker_response(&mut process, RESPONSE_TAG_MANIFEST).await?; + response_result_ok(extension_name, &manifest_response)?; + + send_worker_json_frame( + &mut process, + WORKER_FRAME_START, + &json!({ + "context": WorkerContext { + extension_name, + }, + }), + ) + .await?; + let start_response = read_worker_response(&mut process, RESPONSE_TAG_STARTED).await?; + response_result_ok(extension_name, &start_response)?; + + Ok(process) +} + +/// Spawns one stdio worker process from the provided launch config. +async fn spawn_worker( + extension_name: &str, + config: &PluginWorkerConfig, +) -> Result { + if config.name != extension_name { + return Err(format!( + "worker `{}` does not match extension manifest name `{extension_name}`", + config.name + )); + } + + let mut command = Command::new(&config.command); + command + .args(&config.args) + .envs(&config.environment) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()); + let mut child = command + .spawn() + .map_err(|error| format!("failed to spawn worker `{extension_name}`: {error}"))?; + let stdin = child + .stdin + .take() + .ok_or_else(|| format!("worker `{extension_name}` did not expose stdin"))?; + let stdout = child + .stdout + .take() + .ok_or_else(|| format!("worker `{extension_name}` did not expose stdout"))?; + + Ok(TypeScriptWorkerProcess { + child, + stdin, + stdout: BufReader::new(stdout), + }) +} + +/// Serializes and writes one framed JSON message to the worker stdin. +async fn send_worker_json_frame( + process: &mut TypeScriptWorkerProcess, + tag: u8, + message: &Value, +) -> Result<(), String> { + let payload = serde_json::to_vec(message) + .map_err(|error| format!("failed to encode worker message: {error}"))?; + send_worker_frame(process, tag, &payload).await +} + +/// Writes one framed message to the worker stdin. +async fn send_worker_frame( + process: &mut TypeScriptWorkerProcess, + tag: u8, + payload: &[u8], +) -> Result<(), String> { + let payload_len = u32::try_from(payload.len()).map_err(|_error| { + format!( + "worker frame payload exceeded u32 length: {}", + payload.len() + ) + })?; + let frame_capacity = WORKER_FRAME_HEADER_LEN + .checked_add(payload.len()) + .ok_or_else(|| { + format!( + "worker frame payload exceeded usize length: {}", + payload.len() + ) + })?; + let mut frame = Vec::with_capacity(frame_capacity); + frame.push(tag); + frame.extend_from_slice(&payload_len.to_le_bytes()); + frame.extend_from_slice(payload); + process + .stdin + .write_all(&frame) + .await + .map_err(|error| format!("failed to write worker message: {error}"))?; + process + .stdin + .flush() + .await + .map_err(|error| format!("failed to flush worker message: {error}")) +} + +/// Reads one framed worker response and validates its protocol tag. +async fn read_worker_response( + process: &mut TypeScriptWorkerProcess, + expected_tag: u8, +) -> Result { + let (received_tag, payload) = read_worker_frame(process).await?; + if received_tag != expected_tag { + return Err(format!( + "worker response tag {received_tag} did not match expected tag {expected_tag}", + )); + } + serde_json::from_slice(&payload) + .map_err(|error| format!("worker response was invalid JSON: {error}")) +} + +/// Reads one framed worker response from stdout. +async fn read_worker_frame(process: &mut TypeScriptWorkerProcess) -> Result<(u8, Vec), String> { + let mut header = [0_u8; WORKER_FRAME_HEADER_LEN]; + process + .stdout + .read_exact(&mut header) + .await + .map_err(|error| format!("failed to read worker response header: {error}"))?; + let payload_len = u32::from_le_bytes([header[1], header[2], header[3], header[4]]) as usize; + let mut payload = vec![0_u8; payload_len]; + process + .stdout + .read_exact(&mut payload) + .await + .map_err(|error| format!("failed to read worker response payload: {error}"))?; + Ok((header[0], payload)) +} + +/// Sends one packet batch frame without waiting for an acknowledgement. +async fn send_packet_batch( + process: &mut TypeScriptWorkerProcess, + events: &[RuntimePacketEvent], +) -> Result<(), String> { + let payload = encode_packet_batch(events)?; + send_worker_frame(process, WORKER_FRAME_PACKET_BATCH, &payload).await +} + +/// Sends one provider-event batch frame without waiting for an acknowledgement. +#[cfg(feature = "provider-grpc")] +async fn send_provider_event_batch( + process: &mut TypeScriptWorkerProcess, + events: &[Value], +) -> Result<(), String> { + let payload = json!({ + "events": events, + }); + send_worker_json_frame(process, WORKER_FRAME_PROVIDER_BATCH, &payload).await +} + +/// Checks whether one worker response contains an OK result tag. +fn response_result_ok(extension_name: &str, response: &Value) -> Result<(), String> { + let result = response + .get("result") + .ok_or_else(|| format!("worker `{extension_name}` response omitted result"))?; + match result.get("tag").and_then(Value::as_u64) { + Some(value) if value == u64::from(RESULT_TAG_OK) => Ok(()), + Some(value) if value == u64::from(RESULT_TAG_ERR) => { + let message = result + .get("error") + .and_then(|error| error.get("message")) + .and_then(Value::as_str) + .unwrap_or("worker returned an error result"); + Err(format!( + "worker `{extension_name}` returned error: {message}" + )) + } + Some(other) => Err(format!( + "worker `{extension_name}` response used unsupported result tag {other}", + )), + None => Err(format!( + "worker `{extension_name}` response did not contain result tag", + )), + } +} + +/// Converts the TypeScript manifest wire config into a runtime manifest. +fn extension_manifest_from_config( + config: &ExtensionManifestConfig, +) -> Result { + Ok(ExtensionManifest { + capabilities: config + .capabilities + .iter() + .copied() + .map(extension_capability_from_wire) + .collect::, _>>()?, + resources: config + .resources + .iter() + .map(extension_resource_from_config) + .collect::, _>>()?, + subscriptions: config + .subscriptions + .iter() + .map(packet_subscription_from_config) + .collect::, _>>()?, + }) +} + +/// Converts one wire resource declaration into a runtime resource spec. +fn extension_resource_from_config( + config: &ExtensionResourceConfig, +) -> Result { + match config.kind { + 1 => Ok(ExtensionResourceSpec::UdpListener(UdpListenerSpec { + resource_id: config.resource_id.clone(), + bind_addr: parse_required_addr(config.bind_address.as_deref(), "bindAddress")?, + visibility: stream_visibility_from_config(&config.visibility)?, + read_buffer_bytes: config.read_buffer_bytes, + })), + 2 => Ok(ExtensionResourceSpec::TcpListener(TcpListenerSpec { + resource_id: config.resource_id.clone(), + bind_addr: parse_required_addr(config.bind_address.as_deref(), "bindAddress")?, + visibility: stream_visibility_from_config(&config.visibility)?, + read_buffer_bytes: config.read_buffer_bytes, + })), + 3 => Ok(ExtensionResourceSpec::TcpConnector(TcpConnectorSpec { + resource_id: config.resource_id.clone(), + remote_addr: parse_required_addr(config.remote_address.as_deref(), "remoteAddress")?, + visibility: stream_visibility_from_config(&config.visibility)?, + read_buffer_bytes: config.read_buffer_bytes, + })), + 4 => Ok(ExtensionResourceSpec::WsConnector(WsConnectorSpec { + resource_id: config.resource_id.clone(), + url: config + .url + .clone() + .ok_or_else(|| "websocket connector resource missing url".to_owned())?, + visibility: stream_visibility_from_config(&config.visibility)?, + read_buffer_bytes: config.read_buffer_bytes, + })), + other => Err(format!("unsupported extension resource kind {other}")), + } +} + +/// Converts the wire visibility policy into the runtime visibility enum. +fn stream_visibility_from_config( + config: &ExtensionStreamVisibilityConfig, +) -> Result { + match config.tag { + 1 => Ok(ExtensionStreamVisibility::Private), + 2 => Ok(ExtensionStreamVisibility::Shared { + tag: config + .shared_tag + .clone() + .ok_or_else(|| "shared stream visibility missing sharedTag".to_owned())?, + }), + other => Err(format!("unsupported stream visibility tag {other}")), + } +} + +/// Converts the wire packet subscription into the runtime subscription filter. +fn packet_subscription_from_config( + config: &PacketSubscriptionConfig, +) -> Result { + Ok(PacketSubscription { + source_kind: config + .source_kind + .map(runtime_packet_source_kind_from_wire) + .transpose()?, + transport: config + .transport + .map(runtime_packet_transport_from_wire) + .transpose()?, + event_class: config + .event_class + .map(runtime_packet_event_class_from_wire) + .transpose()?, + local_addr: config + .local_address + .as_deref() + .map(parse_addr) + .transpose()?, + local_port: config.local_port, + remote_addr: config + .remote_address + .as_deref() + .map(parse_addr) + .transpose()?, + remote_port: config.remote_port, + owner_extension: config.owner_extension.clone(), + resource_id: config.resource_id.clone(), + shared_tag: config.shared_tag.clone(), + websocket_frame_type: config + .web_socket_frame_type + .map(runtime_websocket_frame_type_from_wire) + .transpose()?, + }) +} + +/// Parses one required socket address field from a resource config. +fn parse_required_addr(value: Option<&str>, field: &str) -> Result { + value + .ok_or_else(|| format!("resource missing {field}")) + .and_then(parse_addr) +} + +/// Parses one socket address string used by extension resources. +fn parse_addr(value: &str) -> Result { + value + .parse() + .map_err(|error| format!("invalid socket address `{value}`: {error}")) +} + +/// Maps the wire extension capability into the runtime enum. +fn extension_capability_from_wire(value: u8) -> Result { + match value { + 1 => Ok(ExtensionCapability::BindUdp), + 2 => Ok(ExtensionCapability::BindTcp), + 3 => Ok(ExtensionCapability::ConnectTcp), + 4 => Ok(ExtensionCapability::ConnectWebSocket), + 5 => Ok(ExtensionCapability::ObserveObserverIngress), + 6 => Ok(ExtensionCapability::ObserveSharedExtensionStream), + other => Err(format!("unsupported extension capability {other}")), + } +} + +/// Maps the wire packet source-kind selector into the runtime enum. +fn runtime_packet_source_kind_from_wire(value: u8) -> Result { + match value { + 1 => Ok(RuntimePacketSourceKind::ObserverIngress), + 2 => Ok(RuntimePacketSourceKind::ExtensionResource), + other => Err(format!("unsupported runtime packet source kind {other}")), + } +} + +/// Maps the wire packet transport selector into the runtime enum. +fn runtime_packet_transport_from_wire(value: u8) -> Result { + match value { + 1 => Ok(RuntimePacketTransport::Udp), + 2 => Ok(RuntimePacketTransport::Tcp), + 3 => Ok(RuntimePacketTransport::WebSocket), + other => Err(format!("unsupported runtime packet transport {other}")), + } +} + +/// Maps the wire packet event-class selector into the runtime enum. +fn runtime_packet_event_class_from_wire(value: u8) -> Result { + match value { + 1 => Ok(RuntimePacketEventClass::Packet), + 2 => Ok(RuntimePacketEventClass::ConnectionClosed), + other => Err(format!("unsupported runtime packet event class {other}")), + } +} + +/// Maps the wire websocket frame selector into the runtime enum. +fn runtime_websocket_frame_type_from_wire(value: u8) -> Result { + match value { + 1 => Ok(RuntimeWebSocketFrameType::Text), + 2 => Ok(RuntimeWebSocketFrameType::Binary), + 3 => Ok(RuntimeWebSocketFrameType::Ping), + 4 => Ok(RuntimeWebSocketFrameType::Pong), + other => Err(format!("unsupported websocket frame type {other}")), + } +} + +/// Packet-source flag indicating that an owner extension string follows. +const PACKET_SOURCE_OWNER_EXTENSION: u8 = 1 << 0; +/// Packet-source flag indicating that a resource identifier string follows. +const PACKET_SOURCE_RESOURCE_ID: u8 = 1 << 1; +/// Packet-source flag indicating that a shared stream tag string follows. +const PACKET_SOURCE_SHARED_TAG: u8 = 1 << 2; +/// Packet-source flag indicating that a websocket frame type byte follows. +const PACKET_SOURCE_WEBSOCKET_FRAME_TYPE: u8 = 1 << 3; +/// Packet-source flag indicating that a local socket address string follows. +const PACKET_SOURCE_LOCAL_ADDR: u8 = 1 << 4; +/// Packet-source flag indicating that a remote socket address string follows. +const PACKET_SOURCE_REMOTE_ADDR: u8 = 1 << 5; + +/// Serializes runtime packet events into a compact worker-only binary batch. +fn encode_packet_batch(events: &[RuntimePacketEvent]) -> Result, String> { + let mut payload = Vec::new(); + encode_u32(&mut payload, events.len(), "packet batch event count")?; + + for event in events { + payload.push(runtime_packet_source_kind_to_wire(event.source.kind)); + payload.push(runtime_packet_transport_to_wire(event.source.transport)); + payload.push(runtime_packet_event_class_to_wire(event.source.event_class)); + payload.push(packet_source_flags(&event.source)); + payload.extend_from_slice(&event.observed_unix_ms.to_le_bytes()); + + if let Some(frame_type) = event.source.websocket_frame_type { + payload.push(runtime_websocket_frame_type_to_wire(frame_type)); + } + encode_optional_string( + &mut payload, + event.source.owner_extension.as_deref(), + "packet source owner extension", + )?; + encode_optional_string( + &mut payload, + event.source.resource_id.as_deref(), + "packet source resource id", + )?; + encode_optional_string( + &mut payload, + event.source.shared_tag.as_deref(), + "packet source shared tag", + )?; + encode_optional_string( + &mut payload, + event + .source + .local_addr + .map(|addr| addr.to_string()) + .as_deref(), + "packet source local address", + )?; + encode_optional_string( + &mut payload, + event + .source + .remote_addr + .map(|addr| addr.to_string()) + .as_deref(), + "packet source remote address", + )?; + encode_u32(&mut payload, event.bytes.len(), "packet bytes length")?; + payload.extend_from_slice(event.bytes.as_ref()); + } + + Ok(payload) +} + +/// Computes the optional-field bitset for one packet source. +const fn packet_source_flags(source: &RuntimePacketSource) -> u8 { + let mut flags = 0_u8; + if source.owner_extension.is_some() { + flags |= PACKET_SOURCE_OWNER_EXTENSION; + } + if source.resource_id.is_some() { + flags |= PACKET_SOURCE_RESOURCE_ID; + } + if source.shared_tag.is_some() { + flags |= PACKET_SOURCE_SHARED_TAG; + } + if source.websocket_frame_type.is_some() { + flags |= PACKET_SOURCE_WEBSOCKET_FRAME_TYPE; + } + if source.local_addr.is_some() { + flags |= PACKET_SOURCE_LOCAL_ADDR; + } + if source.remote_addr.is_some() { + flags |= PACKET_SOURCE_REMOTE_ADDR; + } + flags +} + +/// Encodes an optional string field when its packet-source flag is present. +fn encode_optional_string( + payload: &mut Vec, + value: Option<&str>, + field: &str, +) -> Result<(), String> { + if let Some(value) = value { + encode_string(payload, value, field)?; + } + Ok(()) +} + +/// Encodes one length-prefixed UTF-8 string. +fn encode_string(payload: &mut Vec, value: &str, field: &str) -> Result<(), String> { + let bytes = value.as_bytes(); + encode_u32(payload, bytes.len(), field)?; + payload.extend_from_slice(bytes); + Ok(()) +} + +/// Encodes one little-endian `u32` after bounds checking a `usize`. +fn encode_u32(payload: &mut Vec, value: usize, field: &str) -> Result<(), String> { + let value = + u32::try_from(value).map_err(|_error| format!("{field} exceeded u32 length: {value}"))?; + payload.extend_from_slice(&value.to_le_bytes()); + Ok(()) +} + +#[cfg(feature = "provider-grpc")] +/// Serializes one transaction event for the worker protocol. +fn provider_transaction_event_wire(event: &TransactionEvent) -> Value { + json!({ + "kind": 1, + "slot": event.slot, + "commitmentStatus": commitment_status_to_wire(event.commitment_status), + "confirmedSlot": event.confirmed_slot, + "finalizedSlot": event.finalized_slot, + "signature": event.signature.map(|signature| signature.to_base58()), + "providerSource": provider_source_wire(event.provider_source.as_ref()), + "transactionKind": tx_kind_to_wire(event.kind), + "transactionBase64": bincode::serialize(event.tx.as_ref()) + .ok() + .map(|bytes| BASE64_STANDARD.encode(bytes)), + }) +} + +#[cfg(feature = "provider-grpc")] +/// Serializes one transaction-log event for the worker protocol. +fn provider_transaction_log_event_wire(event: &TransactionLogEvent) -> Value { + json!({ + "kind": 2, + "slot": event.slot, + "commitmentStatus": commitment_status_to_wire(event.commitment_status), + "signature": event.signature.to_base58(), + "err": event.err, + "logs": event.logs.as_ref(), + "matchedFilter": event.matched_filter.map(|pubkey| pubkey.to_base58()), + "providerSource": provider_source_wire(event.provider_source.as_ref()), + }) +} + +#[cfg(feature = "provider-grpc")] +/// Serializes one transaction-status event for the worker protocol. +fn provider_transaction_status_event_wire(event: &TransactionStatusEvent) -> Value { + json!({ + "kind": 3, + "slot": event.slot, + "commitmentStatus": commitment_status_to_wire(event.commitment_status), + "confirmedSlot": event.confirmed_slot, + "finalizedSlot": event.finalized_slot, + "signature": event.signature.to_base58(), + "isVote": event.is_vote, + "index": event.index, + "err": event.err, + "providerSource": provider_source_wire(event.provider_source.as_ref()), + }) +} + +#[cfg(feature = "provider-grpc")] +/// Serializes one account-update event for the worker protocol. +fn provider_account_update_event_wire(event: &AccountUpdateEvent) -> Value { + json!({ + "kind": 4, + "slot": event.slot, + "commitmentStatus": commitment_status_to_wire(event.commitment_status), + "confirmedSlot": event.confirmed_slot, + "finalizedSlot": event.finalized_slot, + "pubkey": event.pubkey.to_base58(), + "owner": event.owner.to_base58(), + "lamports": event.lamports, + "executable": event.executable, + "rentEpoch": event.rent_epoch, + "dataBase64": BASE64_STANDARD.encode(event.data.as_ref()), + "writeVersion": event.write_version, + "txnSignature": event.txn_signature.map(|signature| signature.to_base58()), + "isStartup": event.is_startup, + "matchedFilter": event.matched_filter.map(|pubkey| pubkey.to_base58()), + "providerSource": provider_source_wire(event.provider_source.as_ref()), + }) +} + +#[cfg(feature = "provider-grpc")] +/// Serializes one block-meta event for the worker protocol. +fn provider_block_meta_event_wire(event: &BlockMetaEvent) -> Value { + json!({ + "kind": 5, + "slot": event.slot, + "commitmentStatus": commitment_status_to_wire(event.commitment_status), + "confirmedSlot": event.confirmed_slot, + "finalizedSlot": event.finalized_slot, + "blockhash": solana_hash::Hash::new_from_array(event.blockhash).to_string(), + "parentSlot": event.parent_slot, + "parentBlockhash": solana_hash::Hash::new_from_array(event.parent_blockhash).to_string(), + "blockTime": event.block_time, + "blockHeight": event.block_height, + "executedTransactionCount": event.executed_transaction_count, + "entriesCount": event.entries_count, + "providerSource": provider_source_wire(event.provider_source.as_ref()), + }) +} + +#[cfg(feature = "provider-grpc")] +/// Serializes one slot-status event for the worker protocol. +fn provider_slot_status_event_wire(event: &SlotStatusEvent) -> Value { + json!({ + "kind": 6, + "slot": event.slot, + "parentSlot": event.parent_slot, + "previousStatus": event.previous_status.map(fork_slot_status_to_wire), + "status": fork_slot_status_to_wire(event.status), + "tipSlot": event.tip_slot, + "confirmedSlot": event.confirmed_slot, + "finalizedSlot": event.finalized_slot, + "providerSource": provider_source_wire(event.provider_source.as_ref()), + }) +} + +#[cfg(feature = "provider-grpc")] +/// Serializes one recent-blockhash event for the worker protocol. +fn provider_recent_blockhash_event_wire(event: &ObservedRecentBlockhashEvent) -> Value { + json!({ + "kind": 7, + "slot": event.slot, + "recentBlockhash": solana_hash::Hash::new_from_array(event.recent_blockhash).to_string(), + "datasetTxCount": event.dataset_tx_count, + "providerSource": provider_source_wire(event.provider_source.as_ref()), + }) +} + +#[cfg(feature = "provider-grpc")] +/// Serializes one provider-source reference for the worker protocol. +fn provider_source_wire(source: Option<&ProviderSourceRef>) -> Option { + source.map(|source| { + json!({ + "kind": source.kind_str(), + "instance": source.instance_str(), + "priority": source.priority(), + "role": source.role().as_str(), + "arbitration": source.arbitration().as_str(), + }) + }) +} + +#[cfg(feature = "provider-grpc")] +/// Maps one commitment status into the worker wire tag. +const fn commitment_status_to_wire(status: TxCommitmentStatus) -> u8 { + match status { + TxCommitmentStatus::Processed => 1, + TxCommitmentStatus::Confirmed => 2, + TxCommitmentStatus::Finalized => 3, + } +} + +#[cfg(feature = "provider-grpc")] +/// Maps one transaction kind into the worker wire tag. +const fn tx_kind_to_wire(kind: TxKind) -> u8 { + match kind { + TxKind::VoteOnly => 1, + TxKind::Mixed => 2, + TxKind::NonVote => 3, + } +} + +#[cfg(feature = "provider-grpc")] +/// Maps one fork slot status into the worker wire tag. +const fn fork_slot_status_to_wire(status: ForkSlotStatus) -> u8 { + match status { + ForkSlotStatus::Processed => 1, + ForkSlotStatus::Confirmed => 2, + ForkSlotStatus::Finalized => 3, + ForkSlotStatus::Orphaned => 4, + } +} + +/// Maps one runtime packet source kind into the worker wire tag. +const fn runtime_packet_source_kind_to_wire(value: RuntimePacketSourceKind) -> u8 { + match value { + RuntimePacketSourceKind::ObserverIngress => 1, + RuntimePacketSourceKind::ExtensionResource => 2, + } +} + +/// Maps one runtime packet transport into the worker wire tag. +const fn runtime_packet_transport_to_wire(value: RuntimePacketTransport) -> u8 { + match value { + RuntimePacketTransport::Udp => 1, + RuntimePacketTransport::Tcp => 2, + RuntimePacketTransport::WebSocket => 3, + } +} + +/// Maps one runtime packet event class into the worker wire tag. +const fn runtime_packet_event_class_to_wire(value: RuntimePacketEventClass) -> u8 { + match value { + RuntimePacketEventClass::Packet => 1, + RuntimePacketEventClass::ConnectionClosed => 2, + } +} + +/// Maps one websocket frame type into the worker wire tag. +const fn runtime_websocket_frame_type_to_wire(value: RuntimeWebSocketFrameType) -> u8 { + match value { + RuntimeWebSocketFrameType::Text => 1, + RuntimeWebSocketFrameType::Binary => 2, + RuntimeWebSocketFrameType::Ping => 3, + RuntimeWebSocketFrameType::Pong => 4, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn websocket_ingress(name: &str) -> IngressConfig { + IngressConfig { + kind: INGRESS_KIND_WEB_SOCKET, + name: name.to_owned(), + bind_address: None, + #[cfg(feature = "provider-grpc")] + endpoint: None, + #[cfg(feature = "provider-grpc")] + stream: None, + #[cfg(feature = "provider-grpc")] + x_token: None, + #[cfg(feature = "provider-grpc")] + commitment: None, + #[cfg(feature = "provider-grpc")] + vote: None, + #[cfg(feature = "provider-grpc")] + failed: None, + #[cfg(feature = "provider-grpc")] + signature: None, + #[cfg(feature = "provider-grpc")] + account_include: None, + #[cfg(feature = "provider-grpc")] + account_exclude: None, + #[cfg(feature = "provider-grpc")] + account_required: None, + #[cfg(feature = "provider-grpc")] + accounts: None, + #[cfg(feature = "provider-grpc")] + owners: None, + #[cfg(feature = "provider-grpc")] + require_transaction_signature: None, + #[cfg(feature = "provider-grpc")] + readiness: None, + #[cfg(feature = "provider-grpc")] + role: None, + #[cfg(feature = "provider-grpc")] + priority: None, + url: Some("wss://example.invalid".to_owned()), + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + account: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + program_id: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + logs_filter: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + mentions: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + requests: Some(vec![]), + entrypoints: None, + #[cfg(feature = "gossip-bootstrap")] + runtime_mode: None, + entrypoint_pinned: None, + kernel_bypass: None, + } + } + + fn direct_shreds_ingress(name: &str) -> IngressConfig { + IngressConfig { + kind: INGRESS_KIND_DIRECT_SHREDS, + name: name.to_owned(), + bind_address: Some("127.0.0.1:20000".to_owned()), + #[cfg(feature = "provider-grpc")] + endpoint: None, + #[cfg(feature = "provider-grpc")] + stream: None, + #[cfg(feature = "provider-grpc")] + x_token: None, + #[cfg(feature = "provider-grpc")] + commitment: None, + #[cfg(feature = "provider-grpc")] + vote: None, + #[cfg(feature = "provider-grpc")] + failed: None, + #[cfg(feature = "provider-grpc")] + signature: None, + #[cfg(feature = "provider-grpc")] + account_include: None, + #[cfg(feature = "provider-grpc")] + account_exclude: None, + #[cfg(feature = "provider-grpc")] + account_required: None, + #[cfg(feature = "provider-grpc")] + accounts: None, + #[cfg(feature = "provider-grpc")] + owners: None, + #[cfg(feature = "provider-grpc")] + require_transaction_signature: None, + #[cfg(feature = "provider-grpc")] + readiness: None, + #[cfg(feature = "provider-grpc")] + role: None, + #[cfg(feature = "provider-grpc")] + priority: None, + url: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + account: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + program_id: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + logs_filter: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + mentions: None, + #[cfg(all(feature = "provider-grpc", feature = "provider-websocket"))] + requests: None, + entrypoints: None, + #[cfg(feature = "gossip-bootstrap")] + runtime_mode: None, + entrypoint_pinned: None, + kernel_bypass: None, + } + } + + #[test] + fn validate_ingress_accepts_mixed_websocket_and_raw_runtime_config() { + let config = RuntimeHostConfig { + app_name: "mixed-app".to_owned(), + runtime_environment: HashMap::new(), + ingress: vec![websocket_ingress("ws-a"), direct_shreds_ingress("direct-a")], + #[cfg(feature = "provider-grpc")] + fan_in: Some(FanInConfig { strategy: 2 }), + plugin_workers: vec![], + }; + + let result = validate_ingress(&config); + assert!(result.is_ok()); + } + + #[cfg(feature = "provider-grpc")] + #[test] + fn provider_plugin_config_matches_websocket_logs_mode() { + let config = provider_plugin_config(&[ProviderStreamMode::WebsocketLogs]); + + assert!(config.transaction_log); + assert!(!config.transaction); + assert!(!config.transaction_status); + assert!(!config.account_update); + assert!(!config.block_meta); + assert!(!config.slot_status); + assert!(!config.recent_blockhash); + } + + #[cfg(feature = "provider-grpc")] + #[test] + fn provider_plugin_config_unions_multiple_provider_modes() { + let config = provider_plugin_config(&[ + ProviderStreamMode::YellowstoneGrpcTransactionStatus, + ProviderStreamMode::YellowstoneGrpcAccounts, + ]); + + assert!(!config.transaction); + assert!(config.transaction_status); + assert!(config.account_update); + assert!(!config.block_meta); + assert!(!config.slot_status); + assert!(!config.recent_blockhash); + assert!(!config.transaction_log); + } +} diff --git a/crates/sof-observer/src/bin/sof_ts_runtime_host/af_xdp.rs b/crates/sof-observer/src/bin/sof_ts_runtime_host/af_xdp.rs new file mode 100644 index 00000000..87f04444 --- /dev/null +++ b/crates/sof-observer/src/bin/sof_ts_runtime_host/af_xdp.rs @@ -0,0 +1,297 @@ +//! AF_XDP packet producer used by the TypeScript runtime host on Linux. + +use std::{ + ffi::CString, + io, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::atomic::{AtomicBool, Ordering}, + time::Duration, +}; + +use sof::{ + ingest::{RawPacketBatch, RawPacketIngress}, + runtime::KernelBypassIngressSender, +}; +use xdp::{ + RingConfigBuilder, Umem, + slab::{HeapSlab, Slab}, + socket::{PollTimeout, XdpSocket, XdpSocketBuilder}, + umem::{FrameSize, UmemCfgBuilder}, +}; + +/// Error type returned by the AF_XDP helper module. +pub(super) type AfXdpError = Box; + +/// AF_XDP socket and batching settings for direct raw-shred ingest. +#[derive(Debug, Clone, Eq, PartialEq)] +pub(super) struct AfXdpConfig { + /// Network interface name used for the XDP bind. + pub(super) interface: String, + /// Receive queue id bound on the target interface. + pub(super) queue_id: u32, + /// Maximum number of packets forwarded in one batch. + pub(super) batch_size: usize, + /// Number of UMEM frames allocated for the socket. + pub(super) umem_frame_count: u32, + /// RX/fill/completion ring depth. + pub(super) ring_depth: u32, + /// Poll timeout used while waiting for packets. + pub(super) poll_timeout: Duration, + /// Destination UDP port filter applied before forwarding. + pub(super) filter: PortFilter, +} + +/// UDP destination ports accepted by the AF_XDP producer. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(super) struct PortFilter { + /// Inclusive lower bound of the accepted port range. + pub(super) range_start: u16, + /// Inclusive upper bound of the accepted port range. + pub(super) range_end: u16, + /// One additional accepted port outside the main range. + pub(super) extra_port: u16, +} + +impl PortFilter { + /// Returns the default Solana TPU and TVU destination port filter. + pub(super) const fn default_sol() -> Self { + Self { + range_start: 12_000, + range_end: 12_100, + extra_port: 8_001, + } + } + + /// Returns whether one UDP destination port should be forwarded. + const fn allows(self, port: u16) -> bool { + (port >= self.range_start && port <= self.range_end) || port == self.extra_port + } +} + +/// Live AF_XDP socket state used while receiving packets. +struct AfXdpSocketState { + /// Bound XDP socket. + socket: XdpSocket, + /// Wakeable RX/fill/completion rings paired with the socket. + rings: xdp::WakableRings, + /// Shared packet memory region used by the rings. + umem: Umem, +} + +/// Internal packet parse outcome while decoding one Ethernet frame. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +enum FrameParseOutcome { + /// The frame was malformed for the expected IPv4 UDP layout. + ParseError, +} + +/// Runs the AF_XDP receive loop until shutdown is requested. +pub(super) fn run_af_xdp_producer_until( + tx: &KernelBypassIngressSender, + config: &AfXdpConfig, + shutdown: &AtomicBool, +) -> Result<(), AfXdpError> { + let mut state = build_af_xdp_socket(config)?; + let mut slab = HeapSlab::with_capacity(usize::try_from(config.ring_depth).unwrap_or(0)); + let mut batch = RawPacketBatch::with_capacity(config.batch_size); + + while !shutdown.load(Ordering::Relaxed) { + drop( + state + .socket + .poll_read(PollTimeout::new(Some(config.poll_timeout))), + ); + + if shutdown.load(Ordering::Relaxed) { + break; + } + + let Some(rx_ring) = state.rings.rx_ring.as_mut() else { + return Err(io::Error::other("AF_XDP socket has no RX ring configured").into()); + }; + // SAFETY: `rx_ring` is paired with `state.umem` from the same socket setup, + // and `slab` is the destination scratch storage expected by the xdp crate. + let received = unsafe { rx_ring.recv(&state.umem, &mut slab) }; + if received == 0 { + continue; + } + + for _ in 0..received { + let Some(frame) = slab.pop_back() else { + break; + }; + match parse_udp_payload_to_raw_packet(&frame, config.filter) { + Ok(Some((source, payload))) => { + batch.push_packet_bytes(source, RawPacketIngress::Udp, payload)?; + if batch.len() >= config.batch_size { + if !tx.send_batch(std::mem::take(&mut batch), false) { + state.umem.free_packet(frame); + return Ok(()); + } + batch = RawPacketBatch::with_capacity(config.batch_size); + } + } + Ok(None) | Err(FrameParseOutcome::ParseError) => {} + } + state.umem.free_packet(frame); + } + + // SAFETY: Returned RX buffers are recycled into the matching fill ring for the + // same `umem`, bounded by the number of descriptors just received. + unsafe { + state + .rings + .fill_ring + .enqueue(&mut state.umem, received, true)? + }; + } + + if !batch.is_empty() { + let _sent = tx.send_batch(batch, false); + } + + Ok(()) +} + +/// Creates and primes one AF_XDP socket using the requested config. +fn build_af_xdp_socket(config: &AfXdpConfig) -> Result { + let ifname = CString::new(config.interface.clone())?; + let nic = xdp::nic::NicIndex::lookup_by_name(&ifname)?.ok_or_else(|| { + io::Error::other(format!( + "network interface `{}` not found", + config.interface + )) + })?; + + let mut builder = XdpSocketBuilder::new()?; + let umem_cfg = UmemCfgBuilder { + frame_size: FrameSize::TwoK, + frame_count: config.umem_frame_count, + ..Default::default() + } + .build()?; + let mut umem = Umem::map(umem_cfg)?; + let ring_cfg = RingConfigBuilder { + rx_count: config.ring_depth, + tx_count: 0, + fill_count: config.ring_depth, + completion_count: config.ring_depth, + } + .build()?; + let (mut rings, bind_flags) = builder.build_wakable_rings(&umem, ring_cfg)?; + let socket = builder.bind(nic, config.queue_id, bind_flags)?; + + // SAFETY: `umem` and `fill_ring` were created together by `build_wakable_rings`, + // and the requested descriptor count is bounded by the configured ring depth. + let _fill_queued = unsafe { + rings.fill_ring.enqueue( + &mut umem, + usize::try_from(config.ring_depth).unwrap_or(0), + true, + )? + }; + + Ok(AfXdpSocketState { + socket, + rings, + umem, + }) +} + +/// Parses one Ethernet frame into a raw UDP payload and source address. +fn parse_udp_payload_to_raw_packet( + frame: &[u8], + filter: PortFilter, +) -> Result, FrameParseOutcome> { + const ETH_LEN: usize = 14; + const ETH_TYPE_OFFSET: usize = 12; + const ETH_P_IPV4: u16 = 0x0800; + const IP_PROTO_UDP: u8 = 17; + if frame.len() < ETH_LEN + 20 { + return Err(FrameParseOutcome::ParseError); + } + + if read_u16_be(frame, ETH_TYPE_OFFSET)? != ETH_P_IPV4 { + return Ok(None); + } + + let ip_offset = ETH_LEN; + let version_ihl = read_u8(frame, ip_offset)?; + if version_ihl >> 4 != 4 { + return Ok(None); + } + let ihl = usize::from(version_ihl & 0x0F).checked_mul(4).unwrap_or(0); + let udp_header_end = checked_add(checked_add(ip_offset, ihl)?, 8)?; + if ihl < 20 || frame.len() < udp_header_end { + return Err(FrameParseOutcome::ParseError); + } + if read_u8(frame, checked_add(ip_offset, 9)?)? != IP_PROTO_UDP { + return Ok(None); + } + let frag_field = read_u16_be(frame, checked_add(ip_offset, 6)?)?; + if (frag_field & 0x1FFF) != 0 { + return Ok(None); + } + + let udp_offset = checked_add(ip_offset, ihl)?; + let src_port = read_u16_be(frame, udp_offset)?; + let dst_port = read_u16_be(frame, checked_add(udp_offset, 2)?)?; + if !filter.allows(dst_port) { + return Ok(None); + } + let udp_len = usize::from(read_u16_be(frame, checked_add(udp_offset, 4)?)?); + if udp_len < 8 { + return Err(FrameParseOutcome::ParseError); + } + let payload_start = checked_add(udp_offset, 8)?; + let payload_end = checked_add(payload_start, udp_len.saturating_sub(8))?; + if payload_end > frame.len() { + return Err(FrameParseOutcome::ParseError); + } + + let source = SocketAddr::new( + IpAddr::V4(read_ipv4_addr(frame, checked_add(ip_offset, 12)?)?), + src_port, + ); + Ok(Some(( + source, + frame + .get(payload_start..payload_end) + .ok_or(FrameParseOutcome::ParseError)?, + ))) +} + +/// Adds two byte offsets while rejecting overflow. +fn checked_add(lhs: usize, rhs: usize) -> Result { + lhs.checked_add(rhs).ok_or(FrameParseOutcome::ParseError) +} + +/// Reads one byte from the frame at the requested offset. +fn read_u8(frame: &[u8], offset: usize) -> Result { + frame + .get(offset) + .copied() + .ok_or(FrameParseOutcome::ParseError) +} + +/// Reads one big-endian `u16` from the frame at the requested offset. +fn read_u16_be(frame: &[u8], offset: usize) -> Result { + let end = checked_add(offset, 2)?; + let bytes: [u8; 2] = frame + .get(offset..end) + .ok_or(FrameParseOutcome::ParseError)? + .try_into() + .map_err(|_slice_error| FrameParseOutcome::ParseError)?; + Ok(u16::from_be_bytes(bytes)) +} + +/// Reads one IPv4 address from the frame at the requested offset. +fn read_ipv4_addr(frame: &[u8], offset: usize) -> Result { + let end = checked_add(offset, 4)?; + let octets: [u8; 4] = frame + .get(offset..end) + .ok_or(FrameParseOutcome::ParseError)? + .try_into() + .map_err(|_slice_error| FrameParseOutcome::ParseError)?; + Ok(Ipv4Addr::from(octets)) +} diff --git a/crates/sof-observer/src/runtime.rs b/crates/sof-observer/src/runtime.rs index 4bdf122a..5f6c4a0a 100644 --- a/crates/sof-observer/src/runtime.rs +++ b/crates/sof-observer/src/runtime.rs @@ -27,12 +27,12 @@ use crate::{ app::runtime as app_runtime, event::TxCommitmentStatus, framework, provider_stream, runtime_env, }; use agave_transaction_view::transaction_view::SanitizedTransactionView; -use app_runtime::RuntimeObservabilityService; #[cfg(feature = "gossip-bootstrap")] use app_runtime::{ ClusterTopologyTracker, ProviderStreamGossipControlPlane, start_provider_stream_gossip_control_plane, }; +use app_runtime::{RuntimeObservabilityHandle, RuntimeObservabilityService}; use provider_stream::{ ProviderSourceHealthEvent, ProviderSourceHealthStatus, ProviderSourceId, ProviderSourceIdentity, ProviderStreamMode, ProviderStreamReceiver, ProviderStreamUpdate, @@ -1678,6 +1678,8 @@ pub struct ObserverRuntime { packet_ingest_rx: Option, /// Optional externally supplied processed provider-stream receiver. provider_stream: Option<(ProviderStreamMode, ProviderStreamReceiver)>, + /// Whether processed provider-stream ingress should run alongside raw packet ingress. + run_raw_ingress_with_provider_stream: bool, } impl ObserverRuntime { @@ -1779,6 +1781,16 @@ impl ObserverRuntime { self } + /// Keeps raw packet ingress active when provider-stream ingress is also configured. + /// + /// Provider-stream ingress normally replaces raw packet ingress. Use this only when a runtime + /// composition intentionally needs both raw packets and processed provider updates. + #[must_use] + pub const fn with_raw_ingress_alongside_provider_stream(mut self) -> Self { + self.run_raw_ingress_with_provider_stream = true; + self + } + #[cfg(feature = "kernel-bypass")] /// Replaces the built-in UDP ingress with an externally supplied kernel-bypass ingress receiver. #[must_use] @@ -1805,6 +1817,19 @@ impl ObserverRuntime { runtime_env::clear_runtime_env_overrides(); self.setup.apply(); if let Some((mode, provider_stream_rx)) = self.provider_stream { + if self.run_raw_ingress_with_provider_stream { + return run_raw_and_provider_stream_runtime( + self.plugin_host, + self.extension_host, + self.derived_state_host, + shutdown_signal, + mode, + provider_stream_rx, + #[cfg(feature = "kernel-bypass")] + self.packet_ingest_rx, + ) + .await; + } return run_provider_stream_runtime( self.plugin_host, self.extension_host, @@ -1892,11 +1917,11 @@ async fn run_provider_stream_runtime( derived_state_host: DerivedStateHost, shutdown_signal: Option, mode: ProviderStreamMode, - mut provider_stream_rx: ProviderStreamReceiver, + provider_stream_rx: ProviderStreamReceiver, ) -> Result<(), RuntimeError> { let capability_check = enforce_provider_stream_capability_policy(mode, &plugin_host, &derived_state_host)?; - let mut gossip_control_plane = + let gossip_control_plane = bootstrap_provider_stream_gossip_control_plane(mode, &plugin_host, &derived_state_host) .await?; let observability = if let Some(bind_addr) = read_observability_bind_addr() { @@ -1946,6 +1971,158 @@ async fn run_provider_stream_runtime( derived_state_host.initialize(); tracing::info!(mode = mode.as_str(), "starting SOF provider-stream runtime"); + let result = run_provider_stream_loop( + plugin_host.clone(), + derived_state_host.clone(), + shutdown_signal, + mode, + provider_stream_rx, + observability_handle, + gossip_control_plane, + ) + .await; + + plugin_host.shutdown().await; + extension_host.shutdown().await; + if let Some(service) = observability { + service.shutdown().await; + } + result +} + +async fn run_raw_and_provider_stream_runtime( + plugin_host: PluginHost, + extension_host: RuntimeExtensionHost, + derived_state_host: DerivedStateHost, + shutdown_signal: Option, + mode: ProviderStreamMode, + provider_stream_rx: ProviderStreamReceiver, + #[cfg(feature = "kernel-bypass")] packet_ingest_rx: Option, +) -> Result<(), RuntimeError> { + enforce_provider_stream_capability_policy(mode, &plugin_host, &derived_state_host)?; + let gossip_control_plane = + bootstrap_provider_stream_gossip_control_plane(mode, &plugin_host, &derived_state_host) + .await?; + plugin_host + .startup() + .await + .map_err(|error| ProviderStreamRuntimeError::Startup { + message: error.to_string(), + })?; + derived_state_host.initialize(); + tracing::info!( + mode = mode.as_str(), + "starting SOF combined raw and provider-stream runtime" + ); + + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + if let Some(signal) = shutdown_signal { + let external_shutdown_tx = shutdown_tx.clone(); + tokio::spawn(async move { + signal.await; + let _ignored = external_shutdown_tx.send(true); + }); + } + let raw_shutdown = Some(shared_shutdown_signal(shutdown_rx.clone())); + let provider_shutdown = Some(shared_shutdown_signal(shutdown_rx)); + let provider_shutdown_tx = shutdown_tx.clone(); + let provider_task = tokio::spawn(run_provider_stream_loop( + plugin_host.clone(), + derived_state_host.clone(), + provider_shutdown, + mode, + provider_stream_rx, + None, + gossip_control_plane, + )); + let provider_shutdown_notifier = tokio::spawn(async move { + let result = provider_task.await; + let _ignored = provider_shutdown_tx.send(true); + result + }); + + let raw_result = { + #[cfg(feature = "kernel-bypass")] + { + if let Some(packet_ingest_rx) = packet_ingest_rx { + app_runtime::run_async_with_hosts_and_kernel_bypass_ingress( + plugin_host, + extension_host, + derived_state_host, + raw_shutdown, + packet_ingest_rx, + ) + .await + } else { + app_runtime::run_async_with_hosts( + plugin_host, + extension_host, + derived_state_host, + raw_shutdown, + ) + .await + } + } + #[cfg(not(feature = "kernel-bypass"))] + { + app_runtime::run_async_with_hosts( + plugin_host, + extension_host, + derived_state_host, + raw_shutdown, + ) + .await + } + } + .map_err(RuntimeError::from); + + let _ignored = shutdown_tx.send(true); + let provider_result = provider_shutdown_notifier.await.map_err(|error| { + RuntimeError::Runloop(format!( + "combined provider-stream runtime task join failed: {error}" + )) + })?; + let provider_result = provider_result.map_err(|error| { + RuntimeError::Runloop(format!( + "combined provider-stream runtime task join failed: {error}" + )) + })?; + + raw_result?; + provider_result?; + tracing::info!( + mode = mode.as_str(), + "SOF combined raw and provider-stream runtime stopped" + ); + Ok(()) +} + +fn shared_shutdown_signal(mut shutdown_rx: tokio::sync::watch::Receiver) -> ShutdownSignal { + Box::pin(async move { + loop { + if *shutdown_rx.borrow() { + return; + } + if shutdown_rx.changed().await.is_err() { + return; + } + } + }) +} + +async fn run_provider_stream_loop( + plugin_host: PluginHost, + derived_state_host: DerivedStateHost, + shutdown_signal: Option, + mode: ProviderStreamMode, + mut provider_stream_rx: ProviderStreamReceiver, + observability_handle: Option, + mut gossip_control_plane: ProviderRuntimeGossipControlPlane, +) -> Result<(), RuntimeError> { + tracing::info!( + mode = mode.as_str(), + "starting SOF provider-stream event loop" + ); let mut shutdown_signal = shutdown_signal; let mut replay_dedupe = ProviderReplayDedupe::new(PROVIDER_REPLAY_DEDUPE_CAPACITY); let mut provider_health = ProviderStreamHealth::default(); @@ -2016,11 +2193,6 @@ async fn run_provider_stream_runtime( } }; - plugin_host.shutdown().await; - extension_host.shutdown().await; - if let Some(service) = observability { - service.shutdown().await; - } gossip_control_plane.shutdown().await; if result.is_ok() { tracing::info!(mode = mode.as_str(), "SOF provider-stream runtime stopped"); diff --git a/docs/README.md b/docs/README.md index d9f97202..50f88521 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,7 +2,8 @@ This folder is the documentation index for the SOF workspace. -Use it as the entry point for architecture references, operations guides, and crate-level setup docs. +Use it as the entry point for architecture references, operations guides, crate-level setup docs, +and SDK docs. The project is aimed at low-latency Solana infrastructure: ingest, replayable state, execution control planes, and deployment/tuning concerns that matter in real trading and market-data systems. @@ -26,6 +27,11 @@ Transaction SDK users: - `../crates/sof-tx/README.md` - `architecture/adr/0006-transaction-sdk-and-dual-submit-routing.md` +TypeScript SDK users: + +- `gitbook/sdk/README.md` +- `../sdks/typescript/README.md` + Contributors: - `../CONTRIBUTING.md` diff --git a/docs/gitbook/README.md b/docs/gitbook/README.md index a6210d1c..fd56dc98 100644 --- a/docs/gitbook/README.md +++ b/docs/gitbook/README.md @@ -11,6 +11,10 @@ The public crates are: - `sof-tx`: transaction construction and submission - `sof-gossip-tuning`: typed tuning profiles for `sof` +The public application-facing SDK is: + +- `@lythaeon-sof/sdk`: TypeScript app SDK backed by the SOF native runtime host + There is also one internal backend crate: - `sof-solana-gossip`: vendored gossip bootstrap backend used by optional gossip mode @@ -42,6 +46,7 @@ If you are evaluating SOF as a product, read these first: - [Why SOF Exists](use-sof/why-sof-exists.md) - [SOF Compared To The Usual Alternatives](use-sof/sof-compared.md) +- [TypeScript SDK](sdk/typescript.md) - [Before You Start](getting-started/before-you-start.md) - [Common Questions](getting-started/common-questions.md) - [System Overview](architecture/system-overview.md) @@ -59,6 +64,16 @@ Choose this track if you want to: Start here: [Use SOF](use-sof/README.md) +### SDKs + +Choose this track if you want to: + +- build a SOF app from Node.js +- use the Rust runtime underneath without managing it directly +- understand the TypeScript package and native runtime host model + +Start here: [SDKs](sdk/README.md) + ### Maintain SOF Choose this track if you are working inside the repository and need: diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 811426cf..44af4d76 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -24,6 +24,9 @@ - [Relay, Repair, and Traffic](operations/relay-repair-and-traffic.md) - [Tuning and Environment Controls](operations/tuning-and-env.md) - [Knob Registry](operations/knob-registry.md) +- [SDKs](sdk/README.md) + - [TypeScript SDK](sdk/typescript.md) + - [Runtime Host and Packaging](sdk/runtime-host.md) - [Maintain SOF](maintainers/README.md) - [Docs Site](maintainers/docs-site.md) - [Repository Layout](getting-started/workspace-layout.md) diff --git a/docs/gitbook/_layouts/website/page.html b/docs/gitbook/_layouts/website/page.html index 33afbce1..e138f2f2 100644 --- a/docs/gitbook/_layouts/website/page.html +++ b/docs/gitbook/_layouts/website/page.html @@ -135,6 +135,7 @@