(
+ request_manager: &fusillade::PostgresRequestManager,
+ db_pool: &sqlx::PgPool,
+ config: &crate::config::Config,
+ input: &IngestFileInput,
+) -> std::result::Result<(), underway::task::Error> {
+ use underway::task::Error as TaskError;
+
+ let Some(store_config) = config.batches.files.object_store.as_ref() else {
+ return Err(TaskError::Fatal("object store config missing".to_string()));
+ };
+
+ let client = BlobStorageClient::new(store_config)
+ .await
+ .map_err(|e| TaskError::Retryable(format!("init object storage client: {e}")))?;
+
+ set_file_ingest_status(db_pool, input.file_id, &input.object_key, "processing", None)
+ .await
+ .map_err(|e| TaskError::Retryable(format!("set ingest status processing: {e}")))?;
+
+ let bytes = client
+ .get_file_bytes(&input.object_key)
+ .await
+ .map_err(|e| TaskError::Retryable(format!("download blob for ingest: {e}")))?;
+
+ let content = std::str::from_utf8(&bytes).map_err(|e| TaskError::Fatal(format!("Invalid UTF-8 in upload: {e}")))?;
+
+ let mut conn = db_pool
+ .acquire()
+ .await
+ .map_err(|e| TaskError::Retryable(format!("acquire db conn for ingest: {e}")))?;
+
+ let mut deployments_repo = Deployments::new(&mut conn);
+ let target_user_id =
+ Uuid::parse_str(&input.uploaded_by).map_err(|e| TaskError::Fatal(format!("invalid uploaded_by owner id on file ingest: {e}")))?;
+ let filter = DeploymentFilter::new(0, i64::MAX)
+ .with_accessible_to(target_user_id)
+ .with_statuses(vec![ModelStatus::Active])
+ .with_deleted(false);
+ let accessible_deployments = deployments_repo
+ .list(&filter)
+ .await
+ .map_err(|e| TaskError::Retryable(format!("query accessible deployments for ingest: {e}")))?;
+ let accessible_models: HashMap> =
+ accessible_deployments.into_iter().map(|d| (d.alias, d.model_type)).collect();
+ drop(conn);
+
+ let mut templates: Vec = Vec::new();
+ let mut non_empty_lines: usize = 0;
+ for (idx, line) in content.lines().enumerate() {
+ let line_no = idx + 1;
+ let trimmed = line.trim();
+ if trimmed.is_empty() {
+ continue;
+ }
+ non_empty_lines += 1;
+ if config.limits.files.max_requests_per_file > 0 && non_empty_lines > config.limits.files.max_requests_per_file {
+ return Err(TaskError::Fatal(format!(
+ "Line {}: file contains too many requests (>{})",
+ line_no, config.limits.files.max_requests_per_file
+ )));
+ }
+
+ let openai_req: OpenAIBatchRequest =
+ serde_json::from_str(trimmed).map_err(|e| TaskError::Fatal(format!("Invalid JSON on line {}: {}", line_no, e)))?;
+ let template = openai_req
+ .to_internal(
+ &input.endpoint,
+ input.api_key.clone(),
+ &accessible_models,
+ &config.batches.allowed_url_paths,
+ )
+ .map_err(|e| TaskError::Fatal(format!("Line {}: {}", line_no, e)))?;
+
+ if config.limits.requests.max_body_size > 0 && template.body.len() as u64 > config.limits.requests.max_body_size {
+ return Err(TaskError::Fatal(format!(
+ "Line {}: Request body is {} bytes, exceeds max {} bytes",
+ line_no,
+ template.body.len(),
+ config.limits.requests.max_body_size
+ )));
+ }
+ templates.push(template);
+ }
+
+ if templates.is_empty() {
+ return Err(TaskError::Fatal("File contains no valid request templates".to_string()));
+ }
+
+ request_manager
+ .populate_file_templates(fusillade::FileId(input.file_id), templates)
+ .await
+ .map_err(|e| TaskError::Retryable(format!("populate file templates: {e}")))?;
+
+ Ok(())
+}
+
+pub async fn build_ingest_file_job(
+ pool: sqlx::PgPool,
+ state: crate::tasks::TaskState,
+) -> anyhow::Result>> {
+ use underway::Job;
+ use underway::job::To;
+ use underway::task::Error as TaskError;
+
+ Job::::builder()
+ .state(state)
+ .step(|cx, input: IngestFileInput| async move {
+ let config = cx.state.config.snapshot();
+ match ingest_blob_to_templates(cx.state.request_manager.as_ref(), &cx.state.db_pool, &config, &input).await {
+ Ok(()) => {
+ set_file_ingest_status(&cx.state.db_pool, input.file_id, &input.object_key, "processed", None)
+ .await
+ .map_err(|e| TaskError::Retryable(format!("set ingest status processed: {e}")))?;
+ tracing::info!(file_id = %input.file_id, "File ingest completed");
+ To::done()
+ }
+ Err(TaskError::Fatal(msg)) => {
+ let _ = set_file_ingest_status(&cx.state.db_pool, input.file_id, &input.object_key, "failed", Some(&msg)).await;
+ Err(TaskError::Fatal(msg))
+ }
+ Err(TaskError::Retryable(msg)) => {
+ let _ = set_file_ingest_status(
+ &cx.state.db_pool,
+ input.file_id,
+ &input.object_key,
+ "pending",
+ Some(&format!("retrying after transient error: {msg}")),
+ )
+ .await;
+ Err(TaskError::Retryable(msg))
+ }
+ Err(other) => Err(other),
+ }
+ })
+ .name("ingest-file")
+ .pool(pool)
+ .build()
+ .await
+ .map_err(Into::into)
+}
+
/// Helper function to create a stream of FileStreamItem from multipart upload
/// This handles the entire multipart parsing inside the stream
#[tracing::instrument(skip(multipart, req_ctx), fields(config.max_file_size, config.max_requests_per_file, uploaded_by = ?uploaded_by, endpoint = %req_ctx.endpoint, config.buffer_size))]
@@ -803,7 +1061,7 @@ Each line must be a valid JSON object containing `custom_id`, `method`, `url`, a
description = "Multipart form with `file` (the JSONL file) and `purpose` (must be `batch`)."
),
responses(
- (status = 201, description = "File uploaded and validated successfully.", body = FileResponse),
+ (status = 201, description = "File accepted for processing.", body = FileResponse),
(status = 400, description = "Invalid file format, malformed JSON, missing required fields, etc."),
(status = 403, description = "Model referenced in the file is not configured or not accessible to your account."),
(status = 413, description = "File exceeds the maximum allowed size."),
@@ -850,12 +1108,6 @@ pub async fn upload_file(
message: format!("Invalid multipart request: {}", e),
})?;
- let stream_config = FileStreamConfig {
- max_file_size: config.limits.files.max_file_size,
- max_requests_per_file: config.limits.files.max_requests_per_file,
- max_request_body_size: config.limits.requests.max_body_size,
- buffer_size: config.batches.files.upload_buffer_size,
- };
// When in org context, attribute file ownership to the org (not the individual user).
// Also used for the hidden API key lookup below.
let target_user_id = current_user.active_organization.unwrap_or(current_user.id);
@@ -886,44 +1138,157 @@ pub async fn upload_file(
// drop conn so it isn't persisted for entire upload process
drop(conn);
- // Create a stream that parses the multipart upload and yields FileStreamItems
- let (file_stream, error_slot) = create_file_stream(
- multipart,
- stream_config,
- uploaded_by,
- FileRequestContext {
+ let created_file_id = if config.batches.files.storage_backend == FileStorageBackend::ObjectStore {
+ let object_store_cfg = config.batches.files.object_store.as_ref().ok_or_else(|| Error::Internal {
+ operation: "object store backend selected but no object_store config found".to_string(),
+ })?;
+ let blob = BlobStorageClient::new(object_store_cfg).await?;
+
+ let mut multipart = multipart;
+ let mut filename: Option = None;
+ let mut purpose: Option = None;
+ let mut total_size: u64 = 0;
+ let file_id = Uuid::new_v4();
+ let tmp_path = format!("/tmp/dwctl-upload-{}.jsonl", file_id);
+ let mut tmp_file = tokio::fs::File::create(&tmp_path).await.map_err(|e| Error::Internal {
+ operation: format!("create temp upload file: {e}"),
+ })?;
+ let mut saw_file = false;
+
+ while let Some(field) = multipart.next_field().await.map_err(|e| Error::BadRequest {
+ message: format!("Invalid multipart upload: {}", e),
+ })? {
+ match field.name().unwrap_or("") {
+ "purpose" => {
+ purpose = Some(field.text().await.unwrap_or_default());
+ }
+ "file" => {
+ saw_file = true;
+ filename = field.file_name().map(|s| s.to_string());
+ let mut field = field;
+ while let Some(chunk) = field.chunk().await.map_err(|e| Error::BadRequest {
+ message: format!("Read upload chunk: {e}"),
+ })? {
+ total_size = total_size.saturating_add(chunk.len() as u64);
+ if max_file_size > 0 && total_size > max_file_size {
+ return Err(Error::PayloadTooLarge {
+ message: format!("File exceeds the maximum allowed size of {} bytes", max_file_size),
+ });
+ }
+ tmp_file.write_all(&chunk).await.map_err(|e| Error::Internal {
+ operation: format!("write temp upload bytes: {e}"),
+ })?;
+ }
+ }
+ _ => {}
+ }
+ }
+
+ if !saw_file {
+ return Err(Error::BadRequest {
+ message: "No file field found in multipart upload".to_string(),
+ });
+ }
+ if total_size == 0 {
+ return Err(Error::BadRequest {
+ message: "File contains no valid request templates".to_string(),
+ });
+ }
+ if purpose.as_deref() != Some("batch") {
+ return Err(Error::BadRequest {
+ message: "Invalid purpose. Only 'batch' is supported.".to_string(),
+ });
+ }
+
+ tmp_file.flush().await.map_err(|e| Error::Internal {
+ operation: format!("flush temp upload file: {e}"),
+ })?;
+
+ let object_key = blob.object_key_for_file(file_id);
+ blob.put_file_from_path(&object_key, &tmp_path, "application/x-ndjson").await?;
+ let _ = tokio::fs::remove_file(&tmp_path).await;
+
+ let ingest = IngestFileInput {
+ file_id,
+ object_key: object_key.clone(),
endpoint,
api_key: user_api_key,
- accessible_models,
- allowed_url_paths: config.batches.allowed_url_paths.clone(),
- },
- Some(api_key_id),
- );
-
- // Create file via request manager with streaming
- let created_file_result = state.request_manager.create_file_stream(file_stream).await.map_err(|e| {
- // Check if WE aborted (control-layer error in slot)
- // Handle poisoned mutex gracefully - the data is still valid
- let upload_err = match error_slot.lock() {
- Ok(mut guard) => guard.take(),
- Err(poisoned) => poisoned.into_inner().take(),
+ api_key_id,
+ uploaded_by: uploaded_by.clone().unwrap_or_default(),
+ filename: filename.clone().unwrap_or_else(|| "upload.jsonl".to_string()),
+ size_bytes: i64::try_from(total_size).unwrap_or(i64::MAX),
};
- if let Some(upload_err) = upload_err {
- tracing::warn!("File upload aborted with error: {:?}", upload_err);
- return upload_err.into_http_error();
- }
- // Otherwise it's a fusillade error
- tracing::warn!("Fusillade error during file upload: {:?}", e);
- match e {
- fusillade::FusilladeError::ValidationError(msg) => Error::BadRequest { message: msg },
- _ => Error::Internal {
- operation: format!("create file: {}", e),
+ state
+ .request_manager
+ .create_file_placeholder(fusillade::FilePlaceholderInput {
+ file_id: fusillade::FileId(file_id),
+ metadata: fusillade::FileMetadata {
+ filename: Some(ingest.filename.clone()),
+ description: None,
+ purpose: Some("batch".to_string()),
+ expires_after_anchor: None,
+ expires_after_seconds: None,
+ size_bytes: Some(ingest.size_bytes),
+ uploaded_by: Some(ingest.uploaded_by.clone()),
+ api_key_id: Some(ingest.api_key_id),
+ },
+ })
+ .await
+ .map_err(|e| Error::Internal {
+ operation: format!("create file placeholder: {e}"),
+ })?;
+ set_file_ingest_status(state.db.write(), file_id, &object_key, "pending", None).await?;
+
+ state
+ .task_runner
+ .ingest_file_job
+ .enqueue(&ingest)
+ .await
+ .map_err(|e| Error::Internal {
+ operation: format!("enqueue file ingest job: {e}"),
+ })?;
+ file_id.into()
+ } else {
+ let stream_config = FileStreamConfig {
+ max_file_size: config.limits.files.max_file_size,
+ max_requests_per_file: config.limits.files.max_requests_per_file,
+ max_request_body_size: config.limits.requests.max_body_size,
+ buffer_size: config.batches.files.upload_buffer_size,
+ };
+ let (file_stream, error_slot) = create_file_stream(
+ multipart,
+ stream_config,
+ uploaded_by,
+ FileRequestContext {
+ endpoint,
+ api_key: user_api_key,
+ accessible_models,
+ allowed_url_paths: config.batches.allowed_url_paths.clone(),
},
- }
- })?;
+ Some(api_key_id),
+ );
- let created_file_id = resolve_upload_stream_result(created_file_result, &error_slot)?;
+ let created_file_result = state.request_manager.create_file_stream(file_stream).await.map_err(|e| {
+ let upload_err = match error_slot.lock() {
+ Ok(mut guard) => guard.take(),
+ Err(poisoned) => poisoned.into_inner().take(),
+ };
+ if let Some(upload_err) = upload_err {
+ tracing::warn!("File upload aborted with error: {:?}", upload_err);
+ return upload_err.into_http_error();
+ }
+ tracing::warn!("Fusillade error during file upload: {:?}", e);
+ match e {
+ fusillade::FusilladeError::ValidationError(msg) => Error::BadRequest { message: msg },
+ _ => Error::Internal {
+ operation: format!("create file: {}", e),
+ },
+ }
+ })?;
+
+ resolve_upload_stream_result(created_file_result, &error_slot)?
+ };
tracing::debug!("File {} uploaded successfully", created_file_id);
@@ -953,6 +1318,7 @@ pub async fn upload_file(
Some(fusillade::batch::Purpose::BatchError) => Purpose::BatchError,
None => Purpose::Batch, // Default to Batch for backwards compatibility
};
+ let file_status = map_ingest_job_to_file_status(get_file_ingest_status(state.db.read(), created_file_id.0).await?.as_ref());
Ok((
StatusCode::CREATED,
@@ -963,6 +1329,8 @@ pub async fn upload_file(
created_at: file.created_at.timestamp(),
filename: file.name,
purpose: api_purpose,
+ status: file_status.status,
+ status_details: file_status.status_details,
expires_at: file.expires_at.map(|dt| dt.timestamp()),
created_by_email: None,
context_name: None,
@@ -1102,10 +1470,12 @@ pub async fn list_files(
let first_id = files.first().map(|f| f.id.0.to_string());
let last_id = files.last().map(|f| f.id.0.to_string());
+ let file_ids: Vec = files.iter().map(|f| f.id.0).collect();
// Resolve creator/context metadata for all returned files.
// Uses a fresh connection (not held across the fusillade call above).
let mut read_conn = state.db.read().acquire().await.map_err(|e| Error::Database(e.into()))?;
+ let ingest_statuses = get_file_ingest_statuses(state.db.read(), &file_ids).await?;
// Resolve individual creators via api_key_id → api_keys.created_by
let api_key_ids: Vec = files
@@ -1174,6 +1544,7 @@ pub async fn list_files(
Some(_) => (Some("Personal".to_string()), Some("personal".to_string())),
None => (None, None),
};
+ let file_status = map_ingest_job_to_file_status(ingest_statuses.get(&f.id.0));
FileResponse {
id: f.id.0.to_string(),
@@ -1182,6 +1553,8 @@ pub async fn list_files(
created_at: f.created_at.timestamp(),
filename: f.name.clone(),
purpose: api_purpose,
+ status: file_status.status,
+ status_details: file_status.status_details,
expires_at: f.expires_at.map(|dt| dt.timestamp()),
created_by_email,
context_name,
@@ -1250,6 +1623,8 @@ pub async fn get_file(
Some(fusillade::batch::Purpose::BatchError) => Purpose::BatchError,
None => Purpose::Batch, // Default to Batch for backwards compatibility
};
+ let ingest_status = get_file_ingest_status(state.db.read(), file_id).await?;
+ let file_status = map_ingest_job_to_file_status(ingest_status.as_ref());
// Enrich with creator/context metadata (same as list_files)
let mut read_conn = state.db.read().acquire().await.map_err(|e| Error::Database(e.into()))?;
@@ -1303,6 +1678,8 @@ pub async fn get_file(
created_at: file.created_at.timestamp(),
filename: file.name,
purpose: api_purpose,
+ status: file_status.status,
+ status_details: file_status.status_details,
expires_at: file.expires_at.map(|dt| dt.timestamp()),
created_by_email,
context_name,
@@ -1362,8 +1739,8 @@ pub async fn get_file_content(
// For BatchOutput and BatchError files, check if the batch is still running
// (which means more data may be written to this file in the future).
// Also capture the expected content count for streaming X-Last-Line.
- let (file_may_receive_more_data, file_content_count) = match file.purpose {
- Some(fusillade::batch::Purpose::Batch) => (false, None), // Input files: count unknown without query
+ let (file_may_receive_more_data, file_content_count, cacheable_completed_result) = match file.purpose {
+ Some(fusillade::batch::Purpose::Batch) => (false, None, false), // Input files: count unknown without query
Some(fusillade::batch::Purpose::BatchOutput) => {
let batch = state
.request_manager
@@ -1381,9 +1758,12 @@ pub async fn get_file_content(
operation: format!("get batch status: {}", e),
})?;
let still_processing = !status.is_finished();
- (still_processing, Some(status.completed_requests as usize))
+ (still_processing, Some(status.completed_requests as usize), status.is_finished())
} else {
- (false, None)
+ return Err(Error::NotFound {
+ resource: "File".to_string(),
+ id: file_id_str.clone(),
+ });
}
}
Some(fusillade::batch::Purpose::BatchError) => {
@@ -1403,12 +1783,15 @@ pub async fn get_file_content(
operation: format!("get batch status: {}", e),
})?;
let still_processing = !status.is_finished();
- (still_processing, Some(status.failed_requests as usize))
+ (still_processing, Some(status.failed_requests as usize), status.is_finished())
} else {
- (false, None)
+ return Err(Error::NotFound {
+ resource: "File".to_string(),
+ id: file_id_str.clone(),
+ });
}
}
- None => (false, None), // Shouldn't happen, but assume complete
+ None => (false, None, false), // Shouldn't happen, but assume complete
};
// Stream the file content as JSONL, starting from offset
@@ -1438,6 +1821,21 @@ pub async fn get_file_content(
}
}
+ if cacheable_completed_result {
+ let config = state.current_config();
+ let cached_bytes = batch_result_cache::get_or_build_file_content_jsonl(
+ config.as_ref(),
+ state.request_manager.as_ref(),
+ fusillade::FileId(file_id),
+ search.clone(),
+ )
+ .await?;
+
+ let slice = batch_result_cache::slice_jsonl_bytes(&cached_bytes, offset, requested_limit);
+ let incomplete = slice.has_more_pages;
+ return Ok(batch_result_cache::jsonl_response_from_slice_with_offset(slice, offset, incomplete));
+ }
+
if let Some(limit) = requested_limit {
// Pagination case: buffer only N+1 items to check for more pages
let content_stream = state
@@ -1567,6 +1965,9 @@ pub async fn delete_file(
});
}
+ let config = state.current_config();
+ batch_result_cache::invalidate_cached_file_results(config.as_ref(), file_id).await?;
+
// Perform the deletion (hard delete - cascades to batches and requests)
state
.request_manager
@@ -1797,6 +2198,103 @@ mod tests {
use std::sync::{Arc, Mutex};
use uuid::Uuid;
+ async fn upload_batch_input_file(
+ app: &axum_test::TestServer,
+ user: &crate::api::models::users::UserResponse,
+ jsonl_content: &str,
+ ) -> FileResponse {
+ let file_part = axum_test::multipart::Part::bytes(jsonl_content.as_bytes().to_vec()).file_name("test-batch.jsonl");
+ let auth = add_auth_headers(user);
+
+ let upload_response = app
+ .post("/ai/v1/files")
+ .add_header(&auth[0].0, &auth[0].1)
+ .add_header(&auth[1].0, &auth[1].1)
+ .multipart(
+ axum_test::multipart::MultipartForm::new()
+ .add_text("purpose", "batch")
+ .add_part("file", file_part),
+ )
+ .await;
+
+ upload_response.assert_status(axum::http::StatusCode::CREATED);
+ upload_response.json()
+ }
+
+ #[test]
+ fn test_map_ingest_job_to_file_status() {
+ let pending = map_ingest_job_to_file_status(Some(&IngestJobState {
+ status: "pending".to_string(),
+ error_message: None,
+ }));
+ assert_eq!(pending.status, "uploaded");
+ assert_eq!(pending.status_details, "");
+
+ let processing = map_ingest_job_to_file_status(Some(&IngestJobState {
+ status: "processing".to_string(),
+ error_message: None,
+ }));
+ assert_eq!(processing.status, "uploaded");
+ assert_eq!(processing.status_details, "");
+
+ let processed = map_ingest_job_to_file_status(Some(&IngestJobState {
+ status: "processed".to_string(),
+ error_message: None,
+ }));
+ assert_eq!(processed.status, "processed");
+ assert_eq!(processed.status_details, "");
+
+ let failed = map_ingest_job_to_file_status(Some(&IngestJobState {
+ status: "failed".to_string(),
+ error_message: Some("boom".to_string()),
+ }));
+ assert_eq!(failed.status, "error");
+ assert_eq!(failed.status_details, "boom");
+
+ let fallback = map_ingest_job_to_file_status(None);
+ assert_eq!(fallback.status, "processed");
+ assert_eq!(fallback.status_details, "");
+ }
+
+ async fn create_batch_for_file(
+ app: &axum_test::TestServer,
+ user: &crate::api::models::users::UserResponse,
+ file_id: &str,
+ ) -> serde_json::Value {
+ let auth = add_auth_headers(user);
+ let batch_response = app
+ .post("/ai/v1/batches")
+ .add_header(&auth[0].0, &auth[0].1)
+ .add_header(&auth[1].0, &auth[1].1)
+ .json(&serde_json::json!({
+ "input_file_id": file_id,
+ "endpoint": "/v1/chat/completions",
+ "completion_window": "24h"
+ }))
+ .await;
+
+ batch_response.assert_status(axum::http::StatusCode::CREATED);
+ batch_response.json()
+ }
+
+ async fn wait_for_batch_requests(pool: &PgPool, batch_uuid: Uuid) {
+ for attempt in 0..200 {
+ let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM fusillade.requests WHERE batch_id = $1")
+ .bind(batch_uuid)
+ .fetch_one(pool)
+ .await
+ .expect("Failed to count requests");
+ if count > 0 {
+ return;
+ }
+ assert!(
+ attempt < 199,
+ "Timed out waiting for requests to be populated for batch {batch_uuid}"
+ );
+ tokio::time::sleep(std::time::Duration::from_millis(50)).await;
+ }
+ }
+
#[sqlx::test]
#[test_log::test]
async fn test_upload_and_download_file_content(pool: PgPool) {
@@ -2134,6 +2632,8 @@ mod tests {
// Should succeed
upload_response.assert_status(axum::http::StatusCode::CREATED);
let file: FileResponse = upload_response.json();
+ assert_eq!(file.status, "processed");
+ assert_eq!(file.status_details, "");
// Verify the file was created - now let's check if metadata was captured
// We need to query the database or fusillade to verify the metadata was stored
@@ -2147,6 +2647,57 @@ mod tests {
get_response.assert_status(axum::http::StatusCode::OK);
let retrieved_file: FileResponse = get_response.json();
assert_eq!(retrieved_file.purpose, crate::api::models::files::Purpose::Batch);
+ assert_eq!(retrieved_file.status, "processed");
+ assert_eq!(retrieved_file.status_details, "");
+ }
+
+ #[sqlx::test]
+ #[test_log::test]
+ async fn test_get_file_uses_failed_ingest_job_status(pool: PgPool) {
+ let (app, _bg_services) = create_test_app(pool.clone(), false).await;
+ let user = create_test_user_with_roles(&pool, vec![Role::StandardUser, Role::BatchAPIUser]).await;
+ let group = create_test_group(&pool).await;
+ add_user_to_group(&pool, user.id, group.id).await;
+ let deployment = create_test_deployment(&pool, user.id, "gpt-4-model", "gpt-4").await;
+ add_deployment_to_group(&pool, deployment.id, group.id, user.id).await;
+
+ let uploaded = upload_batch_input_file(
+ &app,
+ &user,
+ r#"{"custom_id":"req-1","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Hello"}]}}"#,
+ )
+ .await;
+
+ sqlx::query(
+ r#"
+ INSERT INTO file_ingest_jobs (file_id, object_key, status, error_message, created_at, updated_at)
+ VALUES ($1, $2, $3, $4, NOW(), NOW())
+ ON CONFLICT (file_id) DO UPDATE
+ SET object_key = EXCLUDED.object_key,
+ status = EXCLUDED.status,
+ error_message = EXCLUDED.error_message,
+ updated_at = NOW()
+ "#,
+ )
+ .bind(Uuid::parse_str(&uploaded.id).unwrap())
+ .bind("uploads/test.jsonl")
+ .bind("failed")
+ .bind("line 2: invalid JSON")
+ .execute(&pool)
+ .await
+ .unwrap();
+
+ let auth = add_auth_headers(&user);
+ let response = app
+ .get(&format!("/ai/v1/files/{}", uploaded.id))
+ .add_header(&auth[0].0, &auth[0].1)
+ .add_header(&auth[1].0, &auth[1].1)
+ .await;
+
+ response.assert_status(axum::http::StatusCode::OK);
+ let file: FileResponse = response.json();
+ assert_eq!(file.status, "error");
+ assert_eq!(file.status_details, "line 2: invalid JSON");
}
#[sqlx::test]
@@ -2838,6 +3389,91 @@ mod tests {
);
}
+ #[sqlx::test]
+ #[test_log::test]
+ async fn test_deleted_batch_output_file_is_no_longer_downloadable(pool: PgPool) {
+ let (app, _bg_services) = create_test_app(pool.clone(), false).await;
+ let user = create_test_user_with_roles(&pool, vec![Role::StandardUser, Role::BatchAPIUser]).await;
+ let group = create_test_group(&pool).await;
+ add_user_to_group(&pool, user.id, group.id).await;
+
+ let deployment = create_test_deployment(&pool, user.id, "gpt-4", "gpt-4").await;
+ add_deployment_to_group(&pool, deployment.id, group.id, user.id).await;
+
+ let jsonl_content = r#"{"custom_id":"req-1","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Test"}]}}
+"#;
+ let file_part = axum_test::multipart::Part::bytes(jsonl_content.as_bytes()).file_name("test.jsonl");
+ let upload_response = app
+ .post("/ai/v1/files")
+ .add_header(&add_auth_headers(&user)[0].0, &add_auth_headers(&user)[0].1)
+ .add_header(&add_auth_headers(&user)[1].0, &add_auth_headers(&user)[1].1)
+ .multipart(
+ axum_test::multipart::MultipartForm::new()
+ .add_text("purpose", "batch")
+ .add_part("file", file_part),
+ )
+ .await;
+
+ upload_response.assert_status(axum::http::StatusCode::CREATED);
+ let file: FileResponse = upload_response.json();
+
+ let batch_response = app
+ .post("/ai/v1/batches")
+ .add_header(&add_auth_headers(&user)[0].0, &add_auth_headers(&user)[0].1)
+ .add_header(&add_auth_headers(&user)[1].0, &add_auth_headers(&user)[1].1)
+ .json(&serde_json::json!({
+ "input_file_id": file.id,
+ "endpoint": "/v1/chat/completions",
+ "completion_window": "24h"
+ }))
+ .await;
+
+ batch_response.assert_status(axum::http::StatusCode::CREATED);
+ let batch: serde_json::Value = batch_response.json();
+ let batch_id = batch["id"].as_str().expect("Should have id");
+ let output_file_id = batch["output_file_id"].as_str().expect("Should have output_file_id");
+
+ let batch_uuid = batch_id.strip_prefix("batch_").unwrap_or(batch_id);
+ let batch_uuid = Uuid::parse_str(batch_uuid).expect("Valid batch UUID");
+
+ for attempt in 0..200 {
+ let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM fusillade.requests WHERE batch_id = $1")
+ .bind(batch_uuid)
+ .fetch_one(&pool)
+ .await
+ .expect("Failed to count requests");
+ if count > 0 {
+ break;
+ }
+ assert!(attempt < 199, "Timed out waiting for requests to be populated");
+ tokio::time::sleep(std::time::Duration::from_millis(50)).await;
+ }
+
+ sqlx::query(
+ r#"
+ UPDATE fusillade.requests
+ SET state = 'completed', response_status = 200, response_body = '{"choices":[]}', completed_at = NOW()
+ WHERE batch_id = $1
+ "#,
+ )
+ .bind(batch_uuid)
+ .execute(&pool)
+ .await
+ .expect("Failed to complete requests");
+
+ app.delete(&format!("/ai/v1/batches/{}", batch_id))
+ .add_header(&add_auth_headers(&user)[0].0, &add_auth_headers(&user)[0].1)
+ .add_header(&add_auth_headers(&user)[1].0, &add_auth_headers(&user)[1].1)
+ .await
+ .assert_status(axum::http::StatusCode::NO_CONTENT);
+
+ app.get(&format!("/ai/v1/files/{}/content", output_file_id))
+ .add_header(&add_auth_headers(&user)[0].0, &add_auth_headers(&user)[0].1)
+ .add_header(&add_auth_headers(&user)[1].0, &add_auth_headers(&user)[1].1)
+ .await
+ .assert_status(axum::http::StatusCode::NOT_FOUND);
+ }
+
#[tokio::test]
async fn test_upload_rate_limiting_rejects_when_queue_full() {
use crate::config::FileLimitsConfig;
@@ -3104,12 +3740,6 @@ mod tests {
response.assert_status(axum::http::StatusCode::OK);
response.assert_header("content-type", "application/x-ndjson");
response.assert_header("X-Incomplete", "false");
- // Streaming responses must not have content-length (regression guard)
- assert!(
- response.headers().get("content-length").is_none(),
- "Unlimited download should be streamed without content-length"
- );
-
let body = response.text();
let lines: Vec<&str> = body.trim().lines().collect();
assert_eq!(lines.len(), num_requests, "Should return all {} results", num_requests);
@@ -3153,6 +3783,182 @@ mod tests {
response.assert_header("X-Last-Line", &num_requests.to_string());
}
+ #[sqlx::test]
+ #[test_log::test]
+ async fn test_completed_output_file_content_supports_search_filter(pool: PgPool) {
+ let (app, _bg_services) = create_test_app(pool.clone(), false).await;
+ let user = create_test_user_with_roles(&pool, vec![Role::StandardUser, Role::BatchAPIUser]).await;
+ let group = create_test_group(&pool).await;
+ add_user_to_group(&pool, user.id, group.id).await;
+
+ let deployment = create_test_deployment(&pool, user.id, "gpt-4", "gpt-4").await;
+ add_deployment_to_group(&pool, deployment.id, group.id, user.id).await;
+
+ let jsonl_content = r#"{"custom_id":"match-1","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Test 1"}]}}
+{"custom_id":"other-2","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Test 2"}]}}
+{"custom_id":"match-3","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Test 3"}]}}
+"#;
+ let file = upload_batch_input_file(&app, &user, jsonl_content).await;
+ let batch = create_batch_for_file(&app, &user, &file.id).await;
+ let batch_id = batch["id"].as_str().expect("batch id");
+ let output_file_id = batch["output_file_id"].as_str().expect("output file id");
+ let batch_uuid = Uuid::parse_str(batch_id.strip_prefix("batch_").unwrap_or(batch_id)).expect("valid batch uuid");
+
+ wait_for_batch_requests(&pool, batch_uuid).await;
+
+ sqlx::query(
+ r#"
+ UPDATE fusillade.requests
+ SET state = 'completed',
+ response_status = 200,
+ response_body = '{"choices":[{"message":{"content":"ok"}}]}',
+ completed_at = NOW()
+ WHERE batch_id = $1
+ "#,
+ )
+ .bind(batch_uuid)
+ .execute(&pool)
+ .await
+ .expect("Failed to complete requests");
+
+ let auth = add_auth_headers(&user);
+ let response = app
+ .get(&format!("/ai/v1/files/{}/content?search=match", output_file_id))
+ .add_header(&auth[0].0, &auth[0].1)
+ .add_header(&auth[1].0, &auth[1].1)
+ .await;
+
+ response.assert_status(axum::http::StatusCode::OK);
+ response.assert_header("X-Incomplete", "false");
+ response.assert_header("X-Last-Line", "2");
+ let body = response.text();
+ let lines: Vec<&str> = body.trim().lines().collect();
+ assert_eq!(lines.len(), 2);
+ for line in lines {
+ let item: serde_json::Value = serde_json::from_str(line).expect("valid output json");
+ let custom_id = item["custom_id"].as_str().expect("custom_id");
+ assert!(custom_id.contains("match"));
+ }
+ }
+
+ #[sqlx::test]
+ #[test_log::test]
+ async fn test_completed_output_file_content_skip_past_end_returns_empty_page(pool: PgPool) {
+ let (app, _bg_services) = create_test_app(pool.clone(), false).await;
+ let user = create_test_user_with_roles(&pool, vec![Role::StandardUser, Role::BatchAPIUser]).await;
+ let group = create_test_group(&pool).await;
+ add_user_to_group(&pool, user.id, group.id).await;
+
+ let deployment = create_test_deployment(&pool, user.id, "gpt-4", "gpt-4").await;
+ add_deployment_to_group(&pool, deployment.id, group.id, user.id).await;
+
+ let jsonl_content = r#"{"custom_id":"req-1","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Test 1"}]}}
+{"custom_id":"req-2","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Test 2"}]}}
+"#;
+ let file = upload_batch_input_file(&app, &user, jsonl_content).await;
+ let batch = create_batch_for_file(&app, &user, &file.id).await;
+ let batch_id = batch["id"].as_str().expect("batch id");
+ let output_file_id = batch["output_file_id"].as_str().expect("output file id");
+ let batch_uuid = Uuid::parse_str(batch_id.strip_prefix("batch_").unwrap_or(batch_id)).expect("valid batch uuid");
+
+ wait_for_batch_requests(&pool, batch_uuid).await;
+
+ sqlx::query(
+ r#"
+ UPDATE fusillade.requests
+ SET state = 'completed', response_status = 200, response_body = '{"choices":[]}', completed_at = NOW()
+ WHERE batch_id = $1
+ "#,
+ )
+ .bind(batch_uuid)
+ .execute(&pool)
+ .await
+ .expect("Failed to complete requests");
+
+ let auth = add_auth_headers(&user);
+ let response = app
+ .get(&format!("/ai/v1/files/{}/content?skip=10", output_file_id))
+ .add_header(&auth[0].0, &auth[0].1)
+ .add_header(&auth[1].0, &auth[1].1)
+ .await;
+
+ response.assert_status(axum::http::StatusCode::OK);
+ response.assert_header("X-Incomplete", "false");
+ response.assert_header("X-Last-Line", "10");
+ assert_eq!(response.text(), "");
+ }
+
+ #[sqlx::test]
+ #[test_log::test]
+ async fn test_input_file_content_search_preserves_openai_jsonl_shape(pool: PgPool) {
+ let (app, _bg_services) = create_test_app(pool.clone(), false).await;
+ let user = create_test_user_with_roles(&pool, vec![Role::StandardUser, Role::BatchAPIUser]).await;
+ let group = create_test_group(&pool).await;
+ add_user_to_group(&pool, user.id, group.id).await;
+
+ let deployment = create_test_deployment(&pool, user.id, "gpt-4-model", "gpt-4").await;
+ add_deployment_to_group(&pool, deployment.id, group.id, user.id).await;
+
+ let jsonl_content = r#"{"custom_id":"request-1","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Hello 1"}]}}
+{"custom_id":"request-2","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Hello 2"}]}}
+{"custom_id":"request-3","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Hello 3"}]}}
+"#;
+
+ let file = upload_batch_input_file(&app, &user, jsonl_content).await;
+ let auth = add_auth_headers(&user);
+ let response = app
+ .get(&format!("/ai/v1/files/{}/content?search=request-2", file.id))
+ .add_header(&auth[0].0, &auth[0].1)
+ .add_header(&auth[1].0, &auth[1].1)
+ .await;
+
+ response.assert_status(axum::http::StatusCode::OK);
+ response.assert_header("X-Incomplete", "false");
+
+ let body = response.text();
+ let lines: Vec<&str> = body.trim().lines().collect();
+ assert_eq!(lines.len(), 1);
+ let item: serde_json::Value = serde_json::from_str(lines[0]).expect("valid input json");
+ assert_eq!(item["custom_id"], "request-2");
+ assert_eq!(item["method"], "POST");
+ assert_eq!(item["url"], "/v1/chat/completions");
+ assert_eq!(item["body"]["model"], "gpt-4");
+ }
+
+ #[sqlx::test]
+ #[test_log::test]
+ async fn test_deleted_input_file_content_returns_not_found(pool: PgPool) {
+ let (app, _bg_services) = create_test_app(pool.clone(), false).await;
+ let user = create_test_user_with_roles(&pool, vec![Role::StandardUser, Role::BatchAPIUser]).await;
+ let group = create_test_group(&pool).await;
+ add_user_to_group(&pool, user.id, group.id).await;
+
+ let deployment = create_test_deployment(&pool, user.id, "gpt-4-model", "gpt-4").await;
+ add_deployment_to_group(&pool, deployment.id, group.id, user.id).await;
+
+ let jsonl_content = r#"{"custom_id":"request-1","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4","messages":[{"role":"user","content":"Hello"}]}}
+"#;
+ let file = upload_batch_input_file(&app, &user, jsonl_content).await;
+ let auth = add_auth_headers(&user);
+
+ let delete_response = app
+ .delete(&format!("/ai/v1/files/{}", file.id))
+ .add_header(&auth[0].0, &auth[0].1)
+ .add_header(&auth[1].0, &auth[1].1)
+ .await;
+
+ delete_response.assert_status(axum::http::StatusCode::OK);
+ let body: serde_json::Value = delete_response.json();
+ assert_eq!(body["deleted"], true);
+ assert_eq!(body["id"], file.id);
+
+ app.get(&format!("/ai/v1/files/{}/content", file.id))
+ .add_header(&auth[0].0, &auth[0].1)
+ .add_header(&auth[1].0, &auth[1].1)
+ .await
+ .assert_status(axum::http::StatusCode::NOT_FOUND);
+ }
+
#[test]
fn test_file_upload_error_into_http_error_stream_interrupted() {
let err = super::FileUploadError::StreamInterrupted {
@@ -3776,6 +4582,8 @@ mod tests {
let files = body["data"].as_array().unwrap();
assert!(!files.is_empty(), "Expected at least one personal file");
for file in files {
+ assert!(file.get("status").is_some(), "status should be present");
+ assert!(file.get("status_details").is_some(), "status_details should be present");
assert!(
file.get("context_name").is_some() && !file["context_name"].is_null(),
"context_name should be present even in personal context"
diff --git a/dwctl/src/api/models/files.rs b/dwctl/src/api/models/files.rs
index 952c6fb22..27f070d5a 100644
--- a/dwctl/src/api/models/files.rs
+++ b/dwctl/src/api/models/files.rs
@@ -78,6 +78,10 @@ pub struct FileResponse {
#[schema(example = "batch_requests.jsonl")]
pub filename: String,
pub purpose: Purpose,
+ #[schema(example = "processed")]
+ pub status: String,
+ #[schema(example = "")]
+ pub status_details: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires_at: Option, // Unix timestamp
diff --git a/dwctl/src/batch_result_cache.rs b/dwctl/src/batch_result_cache.rs
new file mode 100644
index 000000000..bd6760366
--- /dev/null
+++ b/dwctl/src/batch_result_cache.rs
@@ -0,0 +1,327 @@
+use crate::blob_storage::BlobStorageClient;
+use crate::config::Config;
+use crate::errors::{Error, Result};
+use axum::{
+ body::Body,
+ http::{HeaderValue, StatusCode},
+ response::Response,
+};
+use fusillade::{BatchId, FileContentItem, FileId, ReqwestHttpClient, Storage};
+use futures::StreamExt;
+use serde::Serialize;
+use sha2::{Digest, Sha256};
+use sqlx_pool_router::PoolProvider;
+use uuid::Uuid;
+
+const CACHE_NAMESPACE: &str = "batch-results-cache";
+
+async fn cache_client(config: &Config) -> Result