Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Agent-assisted contributions are welcome, but should be **supervised** and **rev
- **Primary server binary**: `skit` (crate: `streamkit-server`).
- **Dev task runner**: `just` (see `justfile`).
- **Docs**: Astro + Starlight in `docs/` (sidebar in `docs/astro.config.mjs`).
- **UI tooling**: Bun-first. Use `bun install`, `bunx` (or `bun run` scripts) for UI work—avoid npm/pnpm.

## Workflow expectations

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 44 additions & 22 deletions apps/skit-cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tracing::{debug, error, info};
use url::Url;

/// Represents one multipart input file for oneshot execution.
#[derive(Debug, Clone)]
pub struct InputFile {
pub field: String,
pub path: String,
pub content_type: Option<String>,
}

fn http_base_url(server_url: &str) -> Result<Url, Box<dyn std::error::Error + Send + Sync>> {
let mut url = Url::parse(server_url)?;
match url.scheme() {
Expand Down Expand Up @@ -167,12 +175,12 @@ fn parse_batch_operations(
#[allow(clippy::cognitive_complexity)]
pub async fn process_oneshot(
pipeline_path: &str,
input_path: &str,
inputs: &[InputFile],
output_path: &str,
server_url: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = reqwest::Client::new();
process_oneshot_with_client(&client, pipeline_path, input_path, output_path, server_url).await
process_oneshot_with_client(&client, pipeline_path, inputs, output_path, server_url).await
}

/// Process a pipeline using a remote server in oneshot mode with a caller-provided HTTP client.
Expand All @@ -192,13 +200,17 @@ pub async fn process_oneshot(
pub async fn process_oneshot_with_client(
client: &reqwest::Client,
pipeline_path: &str,
input_path: &str,
inputs: &[InputFile],
output_path: &str,
server_url: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if inputs.is_empty() {
return Err("At least one input file is required".into());
}

info!(
pipeline = %pipeline_path,
input = %input_path,
inputs = inputs.len(),
output = %output_path,
server = %server_url,
"Starting oneshot pipeline processing"
Expand All @@ -208,31 +220,41 @@ pub async fn process_oneshot_with_client(
if !Path::new(pipeline_path).exists() {
return Err(format!("Pipeline file not found: {pipeline_path}").into());
}
if !Path::new(input_path).exists() {
return Err(format!("Input file not found: {input_path}").into());
for input in inputs {
if !Path::new(&input.path).exists() {
return Err(format!("Input file not found: {}", input.path).into());
}
}

// Read pipeline configuration
debug!("Reading pipeline configuration from {pipeline_path}");
let pipeline_content = fs::read_to_string(pipeline_path).await?;

// Read input media file
debug!("Reading input media file from {input_path}");
let media_data = fs::read(input_path).await?;

// Extract filename for the multipart form
let input_filename = Path::new(input_path)
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("input")
.to_string();

// Create multipart form
let media_len = media_data.len();
debug!("Creating multipart form with {media_len} bytes of media data");
let form = multipart::Form::new()
.text("config", pipeline_content)
.part("media", multipart::Part::bytes(media_data).file_name(input_filename));
let mut form = multipart::Form::new().text("config", pipeline_content);
for input in inputs {
debug!("Reading input media file from {}", input.path);
let media_data = fs::read(&input.path).await?;
let media_len = media_data.len();

let input_filename = Path::new(&input.path)
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("input")
.to_string();

debug!(
"Adding multipart field '{}' with {} bytes (file: {})",
input.field, media_len, input_filename
);

let mut part = multipart::Part::bytes(media_data).file_name(input_filename);
if let Some(ct) = &input.content_type {
part = part.mime_str(ct)?;
}

form = form.part(input.field.clone(), part);
}

// Send request to server
let url = http_base_url(server_url)?.join("/api/v1/process")?;
Expand Down
2 changes: 1 addition & 1 deletion apps/skit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use client::{
destroy_session, get_config, get_permissions, get_pipeline, get_sample, list_audio_assets,
list_node_schemas, list_packet_schemas, list_plugins, list_samples_dynamic,
list_samples_oneshot, list_sessions, process_oneshot, save_sample, tune_node,
upload_audio_asset, upload_plugin, watch_events,
upload_audio_asset, upload_plugin, watch_events, InputFile,
};
pub use load_test::run_load_test;

Expand Down
6 changes: 5 additions & 1 deletion apps/skit-cli/src/load_test/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ pub async fn oneshot_worker(
let result = process_oneshot_with_client(
&client,
pipeline_path,
input_path,
&[crate::client::InputFile {
field: "media".to_string(),
path: input_path.clone(),
content_type: None,
}],
output_path,
&config.server.url,
)
Expand Down
34 changes: 30 additions & 4 deletions apps/skit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//
// SPDX-License-Identifier: MPL-2.0

use clap::{Parser, Subcommand};
use clap::{ArgAction, Parser, Subcommand};
use streamkit_client::InputFile;
use tracing::{error, info};

#[derive(Parser, Debug)]
Expand All @@ -12,15 +13,34 @@ struct Cli {
command: Commands,
}

#[derive(Debug, Clone)]
struct FieldPath {
field: String,
path: String,
}

fn parse_field_path(s: &str) -> Result<FieldPath, String> {
let mut parts = s.splitn(2, '=');
let field = parts.next().unwrap_or("").trim();
let path = parts.next().unwrap_or("").trim();
if field.is_empty() || path.is_empty() {
return Err("expected form name=path".to_string());
}
Ok(FieldPath { field: field.to_string(), path: path.to_string() })
}

#[derive(Subcommand, Debug)]
enum Commands {
/// Process a pipeline using a remote server (oneshot mode)
#[command(name = "oneshot")]
OneShot {
/// Path to the pipeline YAML file
pipeline: String,
/// Input media file path
/// Primary input media file path (multipart field defaults to 'media')
input: String,
/// Additional input fields in the form name=path (repeatable)
#[arg(long = "input", value_parser = parse_field_path, action = ArgAction::Append)]
extra_input: Vec<FieldPath>,
/// Output file path
output: String,
/// Server URL (default: http://127.0.0.1:4545)
Expand Down Expand Up @@ -329,11 +349,17 @@ async fn main() {
let cli = Cli::parse();

match cli.command {
Commands::OneShot { pipeline, input, output, server } => {
Commands::OneShot { pipeline, input, extra_input, output, server } => {
info!("Starting StreamKit client - oneshot processing");

let mut inputs = Vec::new();
inputs.push(InputFile { field: "media".to_string(), path: input, content_type: None });
for extra in extra_input {
inputs.push(InputFile { field: extra.field, path: extra.path, content_type: None });
}

if let Err(e) =
streamkit_client::process_oneshot(&pipeline, &input, &output, &server).await
streamkit_client::process_oneshot(&pipeline, &inputs, &output, &server).await
{
// Error already logged via tracing above
error!(error = %e, "Failed to process oneshot pipeline");
Expand Down
7 changes: 6 additions & 1 deletion apps/skit-cli/src/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,12 @@ impl Shell {

// Use the existing process_oneshot function from client.rs
// This makes a multipart HTTP POST to /api/v1/process
crate::client::process_oneshot(pipeline_path, input_path, output_path, &http_url).await?;
let inputs = vec![crate::client::InputFile {
field: "media".to_string(),
path: input_path.to_string(),
content_type: None,
}];
crate::client::process_oneshot(pipeline_path, &inputs, output_path, &http_url).await?;

println!("✅ Oneshot processing completed successfully");

Expand Down
1 change: 1 addition & 0 deletions apps/skit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ anyhow = "1.0"
# For HTTP server
axum = { version = "0.8", features = ["multipart", "ws"] }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
tower = "0.5.3"
tower-http = { version = "0.6", features = ["cors", "trace", "fs", "set-header"] }
tokio-stream = "0.1.18"
Expand Down
34 changes: 33 additions & 1 deletion apps/skit/src/bin/gen-docs-reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,39 @@ fn add_synthetic_oneshot_nodes(defs: &mut Vec<NodeDefinition>) {
Receives binary data from the HTTP request body."
.to_string(),
),
param_schema: serde_json::json!({}),
param_schema: serde_json::json!({
"type": "object",
"additionalProperties": false,
"properties": {
"field": {
"type": "string",
"description": "Multipart field name to bind to this input. Defaults to 'media' when only one http_input node exists; otherwise defaults to the node id."
},
"fields": {
"type": "array",
"description": "Optional list of multipart fields for this node. When set, the node exposes one output pin per entry (pin name matches the field name). Entries may be strings or objects with { name, required }.",
"items": {
"oneOf": [
{ "type": "string" },
{
"type": "object",
"additionalProperties": false,
"properties": {
"name": { "type": "string" },
"required": { "type": "boolean", "default": true }
},
"required": ["name"]
}
]
}
},
"required": {
"type": "boolean",
"description": "If true (default), the request must include this field.",
"default": true
}
}
}),
inputs: vec![],
outputs: vec![OutputPin {
name: "out".to_string(),
Expand Down
Loading
Loading