Skip to content

feat: file upload to object storage and then async file verification#954

Open
heldtogether wants to merge 9 commits into
doublewordai:mainfrom
heldtogether:feat/async-file-varification
Open

feat: file upload to object storage and then async file verification#954
heldtogether wants to merge 9 commits into
doublewordai:mainfrom
heldtogether:feat/async-file-varification

Conversation

@heldtogether
Copy link
Copy Markdown

@heldtogether heldtogether commented Apr 7, 2026

Summary

This PR introduces object storage as an optional backend for batch file uploads, decoupling upload latency from Postgres connection throughput.

When batches.files.storage_backend=object_store:

  • Uploads are persisted immediately to object storage (S3-compatible).
  • API returns a file_id right away.
  • Existing JSONL validation + template insertion runs asynchronously in a background ingest job.
  • Batch creation is gated on ingest state to prevent using unprocessed/failed files.

It also adds an S3-backed pull-through cache for immutable batch download results.

Completed batch downloads now build JSONL from Postgres on the first request, write the result to object storage, and serve subsequent identical requests from cache. In-progress batches still read directly from Postgres.

When storage_backend=postgres, existing behavior remains unchanged.

Problem

Previously, both upload and download paths were DB-bound:

  • Uploads parsed + inserted asynchronously while holding DB capacity.
  • Throughput was constrained by Neon/Postgres connections rather than storage I/O.
  • Upload latency and DB pressure were tightly coupled.

What Changed

New object storage client

  • Added dwctl/src/blob_storage.rs.
  • Implements S3-compatible put_object / get_object with configurable endpoint, bucket, region, path style, and timeouts.
  • Keys are deterministic per file: <file_id>.jsonl.

New ingest tracking table

  • Added migration dwctl/migrations/090_add_file_ingest_jobs.sql:
    • file_ingest_jobs(file_id, object_key, status, error_message, created_at, updated_at)
    • statuses: pending | processing | processed | failed
    • index on status

Upload handler flow split

  • Updated dwctl/src/api/handlers/files.rs:
    • Added object-store upload path behind config flag.
    • For object-store mode:
      1. store raw JSONL blob
      2. insert placeholder file record
      3. set ingest status pending
      4. enqueue ingest-file task
      5. return file_id
    • Added ingest status helpers and async ingest pipeline:
      • download blob
      • run current validation/normalization
      • check model access
      • strip disallowed fields like priority
      • insert templates in DB
      • set status processed or failed (+ error message)

Download handler

  • Added a shared batch-result cache layer in dwctl
  • Added deterministic cache keys based on (file_id, search, status)
  • Cached full filtered JSONL payloads and slice them for paginated requests so skip/limit do not fragment the cache
  • Applied caching to:
    • /ai/v1/batches/{batch_id}/results for terminal batches
    • /ai/v1/files/{file_id}/content for terminal output/error files
  • Kept Postgres streaming for:
    • in-progress batch results
    • in-progress output/error files
    • input file downloads
  • Extended the object-store client with:
    • miss-aware reads
    • raw byte writes
    • prefix invalidation
  • Invalidated cached output/error variants on batch deletion
  • Changed deleted-batch semantics so output/error file downloads now return 404 after batch deletion
  • Added cache helper unit tests and a regression test for deleted batch output download behavior

Background task integration

  • Updated dwctl/src/tasks.rs:
    • Registers/runs new ingest_file_job worker alongside existing workers.

Batch creation gating

  • Updated dwctl/src/api/handlers/batches.rs:
    • If file has ingest status:
      • pending/processing => reject as not ready
      • failed => reject with ingest error
      • processed => allow normal flow

Config support

  • Updated dwctl/src/config.rs:
    • Added file storage backend enum and object store config.
    • Added config validation requiring object-store config when backend is object_store.
  • Updated config docs/defaults (config.yaml) and tests accordingly.

Key Design Decisions

  • Raw blob first, parse later: minimizes upload request critical path and DB contention.
  • Reuse existing validation logic in async job: preserves behavior parity and avoids dual parsing code paths.
  • Explicit ingest state in DB: gives durable coordination between upload, worker, and batch creation.
  • Feature via config switch: supports gradual rollout and quick fallback to Postgres-only mode.

Tradeoffs

  • Pros
    • Faster, more predictable upload latency.
    • Reduced connection pool pressure during upload spikes.
    • Better separation of concerns (storage I/O vs DB ingest).
  • Cons
    • Eventual consistency: file exists before templates are query-ready.
    • More moving parts (object store + worker queue + status table).
    • Potential orphaned blobs/placeholders if retries/external outages occur.

Failure Modes and Handling

  • Object store write fails: request fails (no queued ingest).
  • Enqueue fails after upload: request fails; blob may exist without processing.
  • Worker transient failures (DB/object store): retried via task error classification.
  • Validation/model-access failures: status set failed with message; batch creation blocked.
  • Status drift risks mitigated by explicit transitions (pending -> processing -> processed/failed).

@heldtogether heldtogether changed the title Feat/async file varification File upload to object storage and then async file verification Apr 7, 2026
Comment thread dwctl/Cargo.toml
[dependencies]
axum = { version = "0.8", features = ["multipart"] }
fusillade = { version = "14.2.1" }
fusillade = { version = "14.2.1", path = "../../fusillade" }
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this needs bumping when https://github.com/doublewordai/fusillade/pull/207/changes is merged in.

// If asynchronous file ingestion is enabled, gate batch creation on ingest state.
if let Some(status) = crate::api::handlers::files::get_file_ingest_status(state.db.write(), file_id).await? {
match status.as_str() {
"pending" | "processing" => {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider waiting for async processing to complete if we don't want to hard fail at this point

@heldtogether heldtogether marked this pull request as ready for review April 7, 2026 13:48
@heldtogether heldtogether changed the title File upload to object storage and then async file verification feat: file upload to object storage and then async file verification Apr 7, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an optional S3-compatible object storage backend for batch file uploads (async ingest/verification) and introduces an object-store-backed cache for completed batch/file result downloads to reduce Postgres load and decouple upload latency from DB throughput.

Changes:

  • Add object storage client + config, and a new file_ingest_jobs table to track async ingestion state.
  • Split file upload flow: store raw JSONL in object storage, create a placeholder file record, enqueue an ingest job, and gate batch creation on ingest completion.
  • Add completed-result pull-through caching to object storage for batch results and output/error file content, with invalidation on deletions.

Reviewed changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
dwctl/src/test/utils.rs Updates test app state to include new task state fields (db_pool, config).
dwctl/src/test/mod.rs Updates task state construction in tests for new background worker requirements.
dwctl/src/tasks.rs Extends task runner to register and run the new ingest-file worker.
dwctl/src/lib.rs Wires new modules and passes config/db pool into background task state.
dwctl/src/config.rs Adds object store config + backend enum + validation + tests.
dwctl/src/blob_storage.rs Implements S3-compatible blob client (put/get/delete/prefix delete).
dwctl/src/batch_result_cache.rs Adds deterministic cache keys, JSONL serialization, slice/pagination helpers, and cache read/build/invalidate logic.
dwctl/src/api/models/files.rs Extends file API response shape with status and status_details.
dwctl/src/api/handlers/files.rs Adds async ingest pipeline, object-store upload path, caching for completed output/error downloads, and cache invalidation on delete.
dwctl/src/api/handlers/batches.rs Gates batch creation on ingest state; adds caching for completed batch results; invalidates cache on batch delete.
dwctl/migrations/090_add_file_ingest_jobs.sql Creates file_ingest_jobs table and status index.
dwctl/Cargo.toml Adds AWS SDK deps; changes fusillade dependency configuration.
dashboard/src/components/features/batches/FilesTable/columns.tsx Adds UI disable/tooltip behavior based on file status.
dashboard/src/api/control-layer/types.ts Updates dashboard FileObject type with status and status_details.
config.yaml Documents new storage backend and object store config options.
Cargo.lock Locks new AWS SDK dependency graph updates.

Comment on lines +434 to +443
pub struct IngestFileInput {
pub file_id: Uuid,
pub object_key: String,
pub endpoint: String,
pub api_key: String,
pub api_key_id: Uuid,
pub uploaded_by: String,
pub filename: String,
pub size_bytes: i64,
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IngestFileInput includes the raw api_key, which will be persisted as part of the underway job payload. That stores a sensitive credential at rest in the job queue tables. Prefer passing only api_key_id (and any other identifiers) and fetching/decrypting the API key inside the worker step from the database, or using a short-lived token mechanism, so secrets aren’t serialized into job inputs.

Copilot uses AI. Check for mistakes.
Comment on lines +564 to +571
let bytes = client
.get_file_bytes(&input.object_key)
.await
.map_err(|e| TaskError::Retryable(format!("download blob for ingest: {e}")))?;

let content = std::str::from_utf8(&bytes).map_err(|e| TaskError::Fatal(format!("Invalid UTF-8 in upload: {e}")))?;

let mut conn = db_pool
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ingest worker downloads the entire uploaded blob into memory (get_file_bytes -> Vec<u8>), then converts to UTF-8 and iterates lines. This can cause large memory spikes/OOM for big files (especially if limits.files.max_file_size=0 allows unlimited uploads). Consider streaming the S3 body (process line-by-line from a reader) or enforcing a non-zero max file size when storage_backend=object_store to keep memory bounded.

Copilot uses AI. Check for mistakes.
Comment on lines +1151 to +1156
let file_id = Uuid::new_v4();
let tmp_path = format!("/tmp/dwctl-upload-{}.jsonl", file_id);
let mut tmp_file = tokio::fs::File::create(&tmp_path).await.map_err(|e| Error::Internal {
operation: format!("create temp upload file: {e}"),
})?;
let mut saw_file = false;
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The object-store upload path writes the multipart file to a temp path under /tmp but only removes it after a successful put_file_from_path. Any early return (invalid purpose, payload too large, multipart parse errors, S3 upload error, etc.) will leak the temp file and can fill disk over time. Use a tempfile that cleans up on drop and/or ensure remove_file runs in a drop guard / scopeguard on all exit paths.

Copilot uses AI. Check for mistakes.
}

// If asynchronous file ingestion is enabled, gate batch creation on ingest state.
if let Some(status) = crate::api::handlers::files::get_file_ingest_status(state.db.write(), file_id).await? {
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ingest-status gating read uses state.db.write() even though it’s a read-only query. That unnecessarily consumes primary connections (and bypasses replicas if configured). Use state.db.read() here for consistency with the rest of this file (e.g., other read paths use state.db.read().acquire() / state.db.read()).

Suggested change
if let Some(status) = crate::api::handlers::files::get_file_ingest_status(state.db.write(), file_id).await? {
if let Some(status) = crate::api::handlers::files::get_file_ingest_status(state.db.read(), file_id).await? {

Copilot uses AI. Check for mistakes.
}

let config = state.current_config();
batch_result_cache::invalidate_cached_file_results(config.as_ref(), file_id).await?;
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache invalidation is performed before deleting the file, and any object-store error will currently fail the entire delete operation. Since this is a best-effort derived cache, consider making invalidation non-fatal (log and continue) so users can still delete files when the object store is temporarily unavailable.

Suggested change
batch_result_cache::invalidate_cached_file_results(config.as_ref(), file_id).await?;
if let Err(e) = batch_result_cache::invalidate_cached_file_results(config.as_ref(), file_id).await {
tracing::warn!(
file_id = %file_id,
error = %e,
"failed to invalidate cached batch results before file deletion; continuing with delete"
);
}

Copilot uses AI. Check for mistakes.
Comment on lines +1292 to +1295
batch_result_cache::invalidate_cached_file_results(config.as_ref(), *output_file_id).await?;
}
if let Some(error_file_id) = batch.error_file_id {
batch_result_cache::invalidate_cached_file_results(config.as_ref(), *error_file_id).await?;
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as file deletion: cache invalidation errors will currently abort batch deletion. Since cached results are derivative, it’s usually better to treat invalidation as best-effort (log failures, continue deleting) to avoid making deletion dependent on object-store availability.

Suggested change
batch_result_cache::invalidate_cached_file_results(config.as_ref(), *output_file_id).await?;
}
if let Some(error_file_id) = batch.error_file_id {
batch_result_cache::invalidate_cached_file_results(config.as_ref(), *error_file_id).await?;
if let Err(err) = batch_result_cache::invalidate_cached_file_results(config.as_ref(), *output_file_id).await {
tracing::warn!(
batch_id = %batch_id,
file_id = %output_file_id,
error = %err,
"Failed to invalidate cached batch output file results during batch deletion"
);
}
}
if let Some(error_file_id) = batch.error_file_id {
if let Err(err) = batch_result_cache::invalidate_cached_file_results(config.as_ref(), *error_file_id).await {
tracing::warn!(
batch_id = %batch_id,
file_id = %error_file_id,
error = %err,
"Failed to invalidate cached batch error file results during batch deletion"
);
}

Copilot uses AI. Check for mistakes.
Comment thread dwctl/Cargo.toml
[dependencies]
axum = { version = "0.8", features = ["multipart"] }
fusillade = { version = "14.2.1" }
fusillade = { version = "14.2.1", path = "../../fusillade" }
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fusillade is switched to a path dependency at ../../fusillade, which is outside this repository/workspace. That will break reproducible builds/CI unless the environment happens to have that directory present. Prefer using the crates.io version, adding fusillade as a workspace member within this repo, or using a git dependency pinned to a commit/tag.

Suggested change
fusillade = { version = "14.2.1", path = "../../fusillade" }
fusillade = "14.2.1"

Copilot uses AI. Check for mistakes.
Comment on lines +83 to +98
async fn collect_stream_bytes<T, S, F>(mut stream: S, mut serialize: F) -> Result<Vec<u8>>
where
S: futures::Stream<Item = fusillade::Result<T>> + Unpin,
F: FnMut(T) -> Result<Vec<u8>>,
{
let mut bytes = Vec::new();

while let Some(item) = stream.next().await {
let item = item.map_err(|e| Error::Internal {
operation: format!("stream batch result data: {e}"),
})?;
bytes.extend(serialize(item)?);
}

Ok(bytes)
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache build path collects the entire result stream into a single Vec<u8> before responding and before writing to object storage. For large completed batches/output files this can create significant memory spikes and defeats the previous streaming behavior on the first request. Consider streaming directly into the object-store writer (multipart upload) while also streaming the response, or imposing a maximum cacheable payload size/falling back to streaming for large result sets.

Copilot uses AI. Check for mistakes.
Comment on lines +216 to +217
const viewDisabled = file.status === "error";
const disabledReason = file.status_details || "This file cannot be viewed.";
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UI only disables View/Download actions when file.status === "error", but in the new async-ingest flow files can be in "uploaded" (pending/processing). In that state the backend won’t have templates/results available yet, so these actions will likely show empty content or confusing errors. Consider also disabling (or showing a distinct “processing” tooltip/state) when status !== "processed".

Suggested change
const viewDisabled = file.status === "error";
const disabledReason = file.status_details || "This file cannot be viewed.";
const viewDisabled = file.status !== "processed";
const disabledReason =
file.status === "error"
? file.status_details || "This file cannot be viewed."
: "This file is still processing and cannot be viewed yet.";

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants