diff --git a/CHANGELOG.md b/CHANGELOG.md index c19757e4b..295a2ff7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,17 +2,21 @@ All notable changes to this project will be documented in this file. -# Changelog +## Unreleased -# Changelog +### appkit (files plugin) — per-volume auth mode -# Changelog +* **feat(files):** add per-volume `auth` field (`VolumeConfig.auth`) and plugin-level `auth` default (`IFilesConfig.auth`) for selecting between service-principal and on-behalf-of-user execution. Resolution order: `volume.auth ?? plugin.auth ?? "service-principal"`. OBO mode wires HTTP route handlers and `VolumeHandle.asUser` through `runInUserContext` so SDK calls execute as the end user. +* **feat(files):** add `files.auth_mode` OpenTelemetry span attribute on every operation, set to either `"service-principal"` or `"on-behalf-of-user"` for trace filtering. The attribute lands on the connector's existing `files.` span (no duplicate spans). +* **feat(files)!:** `appKit.files("vol").asUser(req).list()` now executes the SDK call as the **end user** (previously the SDK still ran as the service principal — only the policy user was swapped). Programmatic callers that relied on SP credentials post-`asUser(req)` must remove the `asUser` wrap. +* **fix(files):** `asUser(req)` now requires both `x-forwarded-user` AND `x-forwarded-access-token` in production; throws `AuthenticationError.missingToken` when either is missing. Previously, a request with only the user header silently fell back to SP credentials at the SDK level while the policy saw a real-user identity — a privilege-confusion bug. Dev fallback marks the policy user as `isServicePrincipal: true`. +* **fix(files):** OBO volume read responses are no longer cached. SP volume reads still cache. Trade-off: every OBO read hits the SDK; in exchange, no cross-user staleness. (Follow-up: per-(volume, path) generation counter for OBO list cache.) +* **fix(files):** write handlers (`upload`, `mkdir`, `delete`) now `await` cache invalidation before sending the HTTP response, eliminating a write→read race within the same client tick. +* **chore(files):** remove undocumented `bypassPolicy` option on `createVolumeAPI`. Zero consumers in `packages/` or `apps/`; no migration needed. -# Changelog +#### Honest limitation -# Changelog - -# Changelog +Programmatic calls on an OBO volume **without** `asUser(req)` (i.e. `appKit.files("obo-vol").list()`) cannot synthesize a user identity and continue to execute against the service principal client at the call site. For programmatic per-user execution, use `asUser(req)`. The OBO volume default applies to **HTTP route traffic**, where the request headers are available. ## [0.26.0](https://github.com/databricks/appkit/compare/v0.25.1...v0.26.0) (2026-04-27) diff --git a/apps/dev-playground/app.yaml b/apps/dev-playground/app.yaml index e58e71a31..6303c9419 100644 --- a/apps/dev-playground/app.yaml +++ b/apps/dev-playground/app.yaml @@ -27,3 +27,8 @@ env: valueFrom: volume - name: DATABRICKS_VOLUME_IMPLICIT valueFrom: volume + # OBO demo: same physical volume; auth: "on-behalf-of-user" routes + # HTTP traffic through runInUserContext so SDK calls execute as the + # end user. + - name: DATABRICKS_VOLUME_OBO_DEMO + valueFrom: volume diff --git a/apps/dev-playground/client/src/routes/policy-matrix.route.tsx b/apps/dev-playground/client/src/routes/policy-matrix.route.tsx index c299aafc1..55fcb9b51 100644 --- a/apps/dev-playground/client/src/routes/policy-matrix.route.tsx +++ b/apps/dev-playground/client/src/routes/policy-matrix.route.tsx @@ -86,6 +86,10 @@ function PolicyMatrixRoute() { const [runningAll, setRunningAll] = useState(false); const [spResult, setSpResult] = useState(null); const [oboResult, setOboResult] = useState(null); + const [oboVolumeResult, setOboVolumeResult] = useState(null); + const [oboVolumeHttpResult, setOboVolumeHttpResult] = useState( + null, + ); useEffect(() => { fetch("/whoami") @@ -197,6 +201,40 @@ function PolicyMatrixRoute() { setOboResult(JSON.stringify(await r.json(), null, 2)); }, []); + /** + * Programmatic OBO-volume smoke. Calls the dev-playground's + * `/policy/obo-volume` route which hits `appkit.files("obo_demo")` — + * a volume configured with `auth: "on-behalf-of-user"` — through both + * `asUser(req)` and the bare callable. The browser automatically + * forwards `x-forwarded-user` / `x-forwarded-access-token` when running + * behind the Databricks Apps reverse proxy; locally they're absent and + * the dev fallback reports `service-principal` execution. + */ + const runOboVolumeSmoke = useCallback(async () => { + setOboVolumeResult("…"); + const r = await fetch("/policy/obo-volume"); + setOboVolumeResult(JSON.stringify(await r.json(), null, 2)); + }, []); + + /** + * Direct HTTP probe against the OBO volume's `/list` route. Confirms + * end-to-end that the route handler routes the SDK call through + * `runInUserContext` when the headers are present, and returns 401 (or + * 403, in dev fallback) when they're missing. + */ + const runOboVolumeHttp = useCallback(async () => { + setOboVolumeHttpResult("…"); + try { + const r = await fetch(`/api/files/obo_demo/list`); + const body = await r.json().catch(() => ({}) as Record); + setOboVolumeHttpResult( + JSON.stringify({ httpStatus: r.status, body }, null, 2), + ); + } catch (err) { + setOboVolumeHttpResult(err instanceof Error ? err.message : String(err)); + } + }, []); + const reset = useCallback(() => setState(initialState), [initialState]); return ( @@ -297,6 +335,41 @@ function PolicyMatrixRoute() { + +
+

+ Per-volume OBO mode (auth: "on-behalf-of-user") +

+

+ Hits the obo_demo volume — configured with{" "} + auth: "on-behalf-of-user" — to confirm SDK calls + execute as the end user when the request carries{" "} + x-forwarded-access-token +{" "} + x-forwarded-user. In the deployed Databricks App those + headers are injected by the platform reverse proxy. Locally they're + absent and the dev-mode fallback applies: HTTP returns 403{" "} + (the usersOnly policy denies SP traffic) and the + programmatic path runs as the SP. +

+
+ + +
+
+ + +
+
); diff --git a/apps/dev-playground/server/index.ts b/apps/dev-playground/server/index.ts index 94f1cc128..bb42692ef 100644 --- a/apps/dev-playground/server/index.ts +++ b/apps/dev-playground/server/index.ts @@ -48,6 +48,15 @@ const adminOnly: FilePolicy = (action, _resource, user) => { return true; }; +/** + * OBO demo policy: deny anything running as the SP (including the dev + * fallback when no `x-forwarded-access-token` is present). Only real + * end-users (`isServicePrincipal: false`) get through. + */ +const usersOnly: FilePolicy = (_action, _resource, user) => { + return user.isServicePrincipal !== true; +}; + createApp({ plugins: [ server(), @@ -79,6 +88,14 @@ createApp({ write_only: { policy: files.policy.not(files.policy.publicRead()) }, // no explicit policy → falls back to publicRead() + startup warning implicit: {}, + // OBO demo volume — auth: "on-behalf-of-user" routes HTTP traffic + // through `runInUserContext` so SDK calls execute with the end + // user's access token. The `usersOnly` policy denies any traffic + // that wasn't authenticated via `x-forwarded-access-token`. + obo_demo: { + auth: "on-behalf-of-user", + policy: usersOnly, + }, }, }), serving(), @@ -194,6 +211,43 @@ createApp({ results, }); }); + + /** + * Per-volume OBO mode demo. Hits the `obo_demo` volume — configured + * with `auth: "on-behalf-of-user"` — to confirm: + * + * 1. With a forwarded user identity, HTTP routes execute the SDK + * call as the end user (request goes through `runInUserContext`). + * 2. Without `x-forwarded-access-token`, production returns 401; + * development falls back to the SP and the `usersOnly` policy + * rejects with 403. + * 3. Programmatic `appkit.files("obo_demo").asUser(req).list()` runs + * inside the same user context. + * + * Returns the HTTP status, body, and the user identity the server + * observes — so the policy-matrix client can render a clear + * pass/fail panel. + */ + app.get("/policy/obo-volume", async (req, res) => { + const xForwardedUser = req.header("x-forwarded-user") ?? null; + const xForwardedToken = + (req.header("x-forwarded-access-token")?.length ?? 0) > 0; + + const programmatic: ProbeResult[] = await runProbes([ + [ + "obo_demo", + "list", + () => appkit.files("obo_demo").asUser(req).list(), + ], + ]); + + res.json({ + mode: "on-behalf-of-user", + xForwardedUser, + xForwardedAccessTokenPresent: xForwardedToken, + programmatic, + }); + }); }); }, }).catch(console.error); diff --git a/docs/docs/api/appkit/Interface.FilePolicyUser.md b/docs/docs/api/appkit/Interface.FilePolicyUser.md index 0c4bae2fc..9fb22fa0d 100644 --- a/docs/docs/api/appkit/Interface.FilePolicyUser.md +++ b/docs/docs/api/appkit/Interface.FilePolicyUser.md @@ -10,6 +10,11 @@ Minimal user identity passed to the policy function. id: string; ``` +Identifier of the requesting caller. For end-user HTTP requests this is +the value of the `x-forwarded-user` header; for direct SDK calls and +header-less HTTP requests (which run as the service principal), this is +the service principal's ID. + *** ### isServicePrincipal? @@ -18,4 +23,25 @@ id: string; optional isServicePrincipal: boolean; ``` -`true` when the caller is the service principal (direct SDK call, not `asUser`). +`true` when the call is executing as the service principal — either a +direct SDK call (`appKit.files(...)`) or an HTTP request that arrived +without an `x-forwarded-user` / `x-forwarded-access-token` header. +Policy authors typically check this first to distinguish SP traffic +from end-user traffic. + +The flag reflects the **policy user** the plugin selects, which +combines the volume's effective `auth` mode with the headers on the +incoming request. The full matrix: + +| Volume `auth` | Path | Headers | `isServicePrincipal` | Notes | +| --------------------- | ------------------------------ | ----------------------------- | -------------------- | ---------------------------------------------------------------------------------------------- | +| `service-principal` | HTTP | `x-forwarded-user` present | `false` (or unset) | Pre-OBO behavior. Policy sees the end user but the SDK call still runs as the SP. | +| `service-principal` | HTTP | no `x-forwarded-user` | `true` | Headerless request — policy and SDK both run as the SP. | +| `on-behalf-of-user` | HTTP | valid token + user header | `false` | Real end-user execution. Policy sees the user; the SDK call also runs as the user. | +| `on-behalf-of-user` | HTTP | missing token, dev-fallback | `true` | Only reachable when `NODE_ENV === "development"` (prod returns 401). Treated as SP traffic. | +| any | Programmatic `asUser(req)` | `x-forwarded-user` present | `false` | `asUser` extracts the user; the SDK call runs as the user inside `runInUserContext`. | + +Programmatic calls without `asUser(req)` always set +`isServicePrincipal: true` because no request is available to derive a +user identity from. OBO volume defaults apply only to HTTP route +traffic; for programmatic per-user execution, use `asUser(req)`. diff --git a/docs/docs/plugins/files.md b/docs/docs/plugins/files.md index c823a5819..2e87d0341 100644 --- a/docs/docs/plugins/files.md +++ b/docs/docs/plugins/files.md @@ -14,7 +14,7 @@ File operations against Databricks Unity Catalog Volumes. Supports listing, read - Upload size limits with streaming enforcement - Automatic cache invalidation on write operations - Custom content type mappings -- Per-user execution context (OBO) +- **Per-volume auth modes**: each volume can run as the service principal (the default) or on behalf of the end user - **Access policies**: Per-volume policy functions that gate read and write operations ## Basic usage @@ -73,6 +73,11 @@ interface IFilesConfig { customContentTypes?: Record; /** Maximum upload size in bytes. Defaults to 5 GB. Inherited by all volumes. */ maxUploadSize?: number; + /** + * Plugin-level default auth mode. Volumes inherit this when they do not + * set `VolumeConfig.auth`. Defaults to `"service-principal"`. + */ + auth?: "service-principal" | "on-behalf-of-user"; } interface VolumeConfig { @@ -82,6 +87,11 @@ interface VolumeConfig { maxUploadSize?: number; /** Map of file extensions to MIME types for this volume. Overrides plugin-level default. */ customContentTypes?: Record; + /** + * Per-volume auth mode. Inherits from `IFilesConfig.auth` when not set; + * defaults to `"service-principal"`. + */ + auth?: "service-principal" | "on-behalf-of-user"; } ``` @@ -100,6 +110,77 @@ files({ }); ``` +### Auth modes + +Each volume runs in one of two auth modes. The mode determines which identity executes the underlying Unity Catalog SDK call — and therefore which UC grant applies: + +| Mode | SDK identity | Required UC grant on the volume | +| --- | --- | --- | +| `"service-principal"` (default) | the app's service principal | `WRITE_VOLUME` (or read-equivalent) on the **SP** | +| `"on-behalf-of-user"` | the end user from the request | `WRITE_VOLUME` (or read-equivalent) on the **end user** | + +#### Resolution order + +For each volume the plugin resolves the auth mode in this order: + +``` +VolumeConfig.auth > IFilesConfig.auth > "service-principal" +``` + +Set `IFilesConfig.auth` to flip the default for every volume in one place, and override individual volumes via `VolumeConfig.auth`. + +#### Service-principal mode (default) + +Every HTTP request executes as the app's service principal. The end-user identity (from `x-forwarded-user`) is still passed into the volume policy, but the SDK call uses the SP's credentials: + +```ts +files({ + volumes: { + exports: { + // auth is implicit: "service-principal" + policy: files.policy.publicRead(), + }, + }, +}); +``` + +Use SP mode for shared resources, app-managed exports, or any case where you want a single grant on the SP to govern all access. + +#### On-behalf-of-user mode + +Every HTTP request executes as the end user. The plugin pulls the user identity and access token from the headers Databricks Apps inject (`x-forwarded-user` and `x-forwarded-access-token`) and runs the SDK call inside `runInUserContext`: + +```ts +files({ + volumes: { + "user-uploads": { + auth: "on-behalf-of-user", + // The policy sees the real end user (isServicePrincipal: false). + // You can use the volume policy in addition to UC grants. + policy: (action, _resource, user) => + // Only allow real end users, never the SP. + !user.isServicePrincipal, + }, + }, +}); +``` + +Use OBO mode when the per-user UC grant is meaningful — e.g. enforcement of UC ACLs at the SDK layer, or audit trails that need to attribute the API call to the end user instead of the app's SP. + +#### Production vs development behavior + +| Environment | OBO request with **valid** token | OBO request with **missing** `x-forwarded-access-token` | +| --- | --- | --- | +| Production | Runs as the end user. | `401 Unauthorized` — no SDK call is made. | +| Development (`NODE_ENV === "development"`) | Runs as the end user. | Logs a warning, falls back to the SP, and continues. | + +The dev-mode fallback exists so local testing without a Databricks Apps reverse proxy continues to work; deployed apps always have the headers injected. + +#### Limitations + +- The plugin manifest's `getResourceRequirements()` declares `WRITE_VOLUME` on the **service principal** for every volume, regardless of the volume's `auth` mode. For OBO volumes, the actual permission requirement is on the **end user** — communicate this out-of-band (deployment runbooks, customer onboarding docs) until the plugin manifest schema gains a per-volume auth scope field. +- Cache keys include `getCurrentUserId()`, so OBO volumes get per-user cache isolation automatically. SP volumes share a single cache slice keyed by the SP id. + ### Permission model There are three layers of access control in the files plugin. Understanding how they interact is critical for securing your app: @@ -107,11 +188,14 @@ There are three layers of access control in the files plugin. Understanding how ``` ┌─────────────────────────────────────────────────┐ │ Unity Catalog grants │ -│ (WRITE_VOLUME on the SP — set at deploy time) │ +│ WRITE_VOLUME on the SP (auth: service-principal)│ +│ WRITE_VOLUME on the user (auth: on-behalf-of-user)│ ├─────────────────────────────────────────────────┤ │ Execution identity │ -│ HTTP routes → always service principal │ -│ Programmatic → SP by default, asUser() for OBO │ +│ Resolved per volume from VolumeConfig.auth ?? │ +│ IFilesConfig.auth ?? "service-principal". │ +│ asUser(req) is a hard override at the SDK │ +│ level for the programmatic API. │ ├─────────────────────────────────────────────────┤ │ File policies │ │ Per-volume (action, resource, user) → boolean │ @@ -119,13 +203,15 @@ There are three layers of access control in the files plugin. Understanding how └─────────────────────────────────────────────────┘ ``` -- **UC grants** control what the service principal can do at the Databricks level. These are set at deploy time via `app.yaml` resource bindings. The SP needs `WRITE_VOLUME` — the plugin declares this via resource requirements. -- **Execution identity** determines whose credentials are used for the actual API call. HTTP routes always use the SP. The programmatic API uses SP by default but supports `asUser(req)` for OBO. -- **File policies** are application-level checks evaluated **before** the API call. They receive the requesting user's identity (from the `x-forwarded-user` header) and decide allow/deny. This is the only gate that distinguishes between users on HTTP routes. +- **UC grants** control what an identity can do at the Databricks level. Which identity needs the grant depends on the volume's auth mode (see [Auth modes](#auth-modes)). For SP volumes, the SP needs `WRITE_VOLUME` (the plugin declares this in its manifest). For OBO volumes, the **end user** needs `WRITE_VOLUME` on the volume; the SP itself does not. +- **Execution identity** determines whose credentials are used for the actual API call. Each volume resolves to either the service principal or the end user, per its `auth` setting. The programmatic API also exposes `asUser(req)` to force per-user execution regardless of the volume's `auth`. +- **File policies** are application-level checks evaluated **before** the API call. They receive a `FilePolicyUser` describing the caller and decide allow/deny. On HTTP routes the policy user is selected based on the volume's `auth` mode and the request headers — see the [`isServicePrincipal` matrix](#policy-user-matrix). On SP volumes when `x-forwarded-user` is absent, the policy receives `{ id: , isServicePrincipal: true }` and decides whether to allow service-principal traffic. This is the only gate that distinguishes between users on HTTP routes. :::warning -Since HTTP routes always execute as the service principal, removing a user's UC `WRITE_VOLUME` grant has **no effect** on HTTP access — the SP's grant is what's used. Policies are how you restrict what individual users can do through your app. +For service-principal volumes, every HTTP request executes as the SP regardless of which user made it — so removing a user's UC `WRITE_VOLUME` grant has **no effect** on HTTP access. Policies are how you restrict what individual users can do through your app. + +For on-behalf-of-user volumes, requests execute as the requesting user — so each user must have `WRITE_VOLUME` on the volume themselves, and you can rely on UC grants in addition to policies. ::: @@ -209,6 +295,56 @@ files({ }); ``` +#### OBO policy example + +For on-behalf-of-user volumes, the policy receives `isServicePrincipal: false` whenever the request runs with a real end-user identity. A common pattern is to deny SP traffic outright so anonymous (header-less) calls can't reach the volume: + +```ts +import { type FilePolicy } from "@databricks/appkit"; + +// Deny anything running as the service principal — including the dev-mode +// fallback when no x-forwarded-access-token was provided. Real end users +// (with isServicePrincipal: false) get the configured access. +const usersOnly: FilePolicy = (_action, _resource, user) => { + return user.isServicePrincipal !== true; +}; + +files({ + volumes: { + "user-uploads": { + auth: "on-behalf-of-user", + policy: usersOnly, + }, + }, +}); +``` + +You can compose it with any other policy via `files.policy.all(...)` to add per-action gating: + +```ts +files({ + volumes: { + "user-uploads": { + auth: "on-behalf-of-user", + policy: files.policy.all(usersOnly, files.policy.publicRead()), + }, + }, +}); +``` + +#### Policy user matrix + +The plugin selects the policy user based on the volume's effective `auth` mode and the request headers. The full table: + +| Volume `auth` | Path | Headers | `isServicePrincipal` | Notes | +| --------------------- | --------------------------- | ----------------------------- | -------------------- | ---------------------------------------------------------------------------------------------- | +| `service-principal` | HTTP | `x-forwarded-user` present | `false` (or unset) | Pre-OBO behavior. Policy sees the end user but the SDK call still runs as the SP. | +| `service-principal` | HTTP | no `x-forwarded-user` | `true` | Headerless request — policy and SDK both run as the SP. | +| `on-behalf-of-user` | HTTP | valid token + user header | `false` | Real end-user execution. Policy sees the user; the SDK call also runs as the user. | +| `on-behalf-of-user` | HTTP | missing token, dev-fallback | `true` | Only reachable when `NODE_ENV === "development"` (prod returns 401). Treated as SP traffic. | +| any | Programmatic `asUser(req)` | `x-forwarded-user` present | `false` | `asUser` extracts the user; the SDK call runs as the user inside `runInUserContext`. | +| any | Programmatic (no `asUser`) | n/a | `true` | No request available to derive a user — runs as the SP. | + #### Enforcement - **HTTP routes**: Policy checked before every operation. Denied → `403` JSON response with `Policy denied "{action}" on volume "{volumeKey}"`. @@ -233,7 +369,7 @@ Dangerous MIME types (`text/html`, `text/javascript`, `application/javascript`, ## HTTP routes -Routes are mounted at `/api/files/*`. All routes execute as the service principal. Policy enforcement checks user identity (from the `x-forwarded-user` header) before allowing operations — see [Access policies](#access-policies). +Routes are mounted at `/api/files/*`. Each route resolves the volume's [auth mode](#auth-modes) and either executes as the service principal (the default) or wraps the SDK call in `runInUserContext` for OBO volumes. Before every operation the volume policy runs against the resolved policy user — see the [policy user matrix](#policy-user-matrix) for the exact mapping. See also [Access policies](#access-policies). | Method | Path | Query / Body | Response | | ------ | -------------------------- | ---------------------------- | ------------------------------------------------- | @@ -287,21 +423,38 @@ Write operations (`upload`, `mkdir`, `delete`) automatically invalidate the cach ## Programmatic API -The `exports()` API is a callable that accepts a volume key and returns a `VolumeHandle`. The handle exposes all `VolumeAPI` methods directly (service principal, logs a warning) and an `asUser(req)` method for OBO access (recommended). +The `files` plugin export is a callable that accepts a volume key and returns a `VolumeHandle`. The handle exposes all `VolumeAPI` methods directly and an `asUser(req)` method for opting into per-user execution. ```ts -// OBO access (recommended) +// Default — runs as the service principal, regardless of the volume's auth +// setting (no req is available to derive a user from). +const entries = await appkit.files("uploads").list(); + +// asUser(req) — runs as the end user, regardless of the volume's auth +// setting. Forces SDK calls into runInUserContext using the request's +// x-forwarded-user / x-forwarded-access-token headers. const entries = await appkit.files("uploads").asUser(req).list(); const content = await appkit.files("exports").asUser(req).read("report.csv"); -// Service principal access (logs a warning encouraging OBO) -const entries = await appkit.files("uploads").list(); - // Named accessor const vol = appkit.files.volume("uploads"); await vol.asUser(req).list(); ``` +### `asUser(req)` + +`asUser(req)` is the supported path for programmatic per-user execution. The returned API runs every method inside `runInUserContext` so the underlying `WorkspaceClient` is the user-token client — the SDK call executes as the user, not just the policy check. + +In production, `asUser(req)` throws `AuthenticationError.missingToken` when `x-forwarded-user` is absent. In development (`NODE_ENV === "development"`) it logs a warning and falls back to the service principal so local testing without a Databricks Apps reverse proxy keeps working — the fallback skips the `runInUserContext` wrap. + +:::warning Programmatic OBO without `asUser(req)` + +A volume configured with `auth: "on-behalf-of-user"` only routes through `runInUserContext` on the **HTTP route path**, where the request headers are available. A direct programmatic call — `appkit.files("obo-vol").list()` — has no request to derive an end-user identity from, so it executes against whatever client `getWorkspaceClient()` resolves to at the call site (typically the SP at the top level). + +For programmatic per-user execution, always use `asUser(req)`. The volume's `auth` mode controls HTTP traffic; `asUser(req)` controls programmatic traffic. + +::: + ### VolumeAPI methods | Method | Signature | Returns | @@ -369,9 +522,20 @@ interface FileResource { } interface FilePolicyUser { - /** User ID from the `x-forwarded-user` header. */ + /** + * Identifier of the requesting caller. For end-user HTTP requests this is + * the value of the `x-forwarded-user` header; for direct SDK calls and + * header-less HTTP requests (which run as the service principal), this + * is the service principal's ID. + */ id: string; - /** `true` when the caller is the service principal (direct SDK call, not `asUser`). */ + /** + * `true` when the call is executing as the service principal — either a + * direct SDK call (`appKit.files(...)` without `asUser`), an HTTP request + * with no forwarded headers, or the dev-mode fallback for an OBO volume + * with a missing token. See the [policy user matrix](#policy-user-matrix) + * for the full table. + */ isServicePrincipal?: boolean; } @@ -388,6 +552,11 @@ interface VolumeConfig { maxUploadSize?: number; /** Map of file extensions to MIME types for this volume. */ customContentTypes?: Record; + /** + * Per-volume auth mode. Inherits from `IFilesConfig.auth` when not set; + * defaults to `"service-principal"`. + */ + auth?: "service-principal" | "on-behalf-of-user"; } interface VolumeAPI { @@ -402,7 +571,10 @@ interface VolumeAPI { preview(filePath: string): Promise; } -/** Volume handle: all VolumeAPI methods (service principal) + asUser() for OBO. */ +/** + * Volume handle: all VolumeAPI methods (run as the service principal by + * default) + asUser() to force per-user execution at the SDK level. + */ type VolumeHandle = VolumeAPI & { asUser: (req: Request) => VolumeAPI; }; @@ -420,9 +592,12 @@ Built-in extensions: `.png`, `.jpg`, `.jpeg`, `.gif`, `.webp`, `.svg`, `.bmp`, ` ## User context -HTTP routes always execute as the **service principal** — the SP's Databricks credentials are used for all API calls. User identity is extracted from the `x-forwarded-user` header and passed to the volume's [access policy](#access-policies) for authorization. This means UC grants on the SP (not individual users) determine what operations are possible, while policies control what each user is allowed to do through the app. +HTTP routes execute as either the service principal or the end user, depending on the volume's [auth mode](#auth-modes): -The programmatic API returns a `VolumeHandle` that exposes all `VolumeAPI` methods directly (service principal) and an `asUser(req)` method for OBO access. Calling any method without `asUser()` logs a warning encouraging OBO usage but does not throw. OBO access is strongly recommended for production use. +- **Service-principal volumes** (the default): the SP's Databricks credentials are used for the API call. User identity is extracted from the `x-forwarded-user` header and passed to the volume's [access policy](#access-policies) for authorization, but the SDK call still runs as the SP. When the header is absent the policy is handed `{ id: , isServicePrincipal: true }` and decides whether to allow the call — in practice that branch only fires in development without a reverse proxy or when an upstream proxy is misconfigured, since real Databricks Apps runtimes always forward the header. UC grants on the **SP** determine what operations are possible. +- **On-behalf-of-user volumes**: the end user's access token (from `x-forwarded-access-token`) is used to mint the SDK client, so the API call runs with the user's identity. Both the policy and the SDK see the user. UC grants on the **end user** determine what operations are possible. In production, requests with a missing token return `401`; in development (`NODE_ENV === "development"`) they fall back to the SP with a warning. + +The programmatic API returns a `VolumeHandle` that exposes all `VolumeAPI` methods directly and an `asUser(req)` method for forcing per-user execution. Calling a method without `asUser()` runs the policy and the SDK call as the SP. `asUser(req)` is a hard override at the SDK level: it forces every subsequent call to execute as the end user inside `runInUserContext`, regardless of the volume's `auth` setting. In production, `asUser(req)` throws `AuthenticationError.missingToken` when the `x-forwarded-user` header is missing. In development it falls back to the service principal instead, so local testing without a reverse proxy continues to work. ## Resource requirements @@ -430,6 +605,8 @@ Volume resources are declared **dynamically** via `getResourceRequirements(confi For example, if `DATABRICKS_VOLUME_UPLOADS` and `DATABRICKS_VOLUME_EXPORTS` are set, calling `files()` generates two required volume resources validated at startup — no explicit `volumes` config needed. +The manifest declares the grant against the **service principal**. For OBO volumes (`auth: "on-behalf-of-user"`), the actual permission requirement is on the **end user** — communicate this out-of-band in your deployment documentation until the manifest schema gains a per-volume auth scope field. + ## Error responses All errors return JSON: diff --git a/packages/appkit/src/connectors/files/client.ts b/packages/appkit/src/connectors/files/client.ts index caa384af7..93203fdb6 100644 --- a/packages/appkit/src/connectors/files/client.ts +++ b/packages/appkit/src/connectors/files/client.ts @@ -1,3 +1,4 @@ +import { AsyncLocalStorage } from "node:async_hooks"; import { ApiError, type WorkspaceClient } from "@databricks/sdk-experimental"; import type { TelemetryOptions } from "shared"; import { createLogger } from "../../logging/logger"; @@ -24,6 +25,37 @@ import { const logger = createLogger("connectors:files"); +/** + * Ambient span-attribute propagation for `FilesConnector.traced()`. + * + * Callers (e.g. the plugin's `_withAuthModeAttributes` wrapper) set extra + * span attributes here via `runWithFilesSpanAttributes(attrs, fn)`. The + * connector's `traced()` decorator reads them and merges them into the + * span it creates around the SDK call. This lets the plugin tag spans with + * `files.auth_mode` without opening a duplicate `files.` span. + * + * AsyncLocalStorage is used so concurrent requests don't see each other's + * attributes. Outside an active scope, `getStore()` returns `undefined` and + * the connector falls back to the static attribute set. + */ +const filesSpanAttributesStorage = new AsyncLocalStorage< + Record +>(); + +/** + * Run `fn` with the supplied attributes attached to whatever span the + * `FilesConnector` opens for its SDK call. Used to propagate request-scoped + * attributes (e.g. `files.auth_mode`) onto the connector's span without + * opening a parent span — avoids the 2x span allocation that + * `startActiveSpan` parented otherwise. + */ +export function runWithFilesSpanAttributes( + attributes: Record, + fn: () => Promise, +): Promise { + return filesSpanAttributesStorage.run(attributes, fn); +} + interface FilesConnectorConfig { defaultVolume?: string; timeout?: number; @@ -102,12 +134,18 @@ export class FilesConnector { const startTime = Date.now(); let success = false; + // Pull any ambient attributes set by `runWithFilesSpanAttributes` (e.g. + // `files.auth_mode` from the plugin layer). Static `attributes` win on + // collision so callers can override per-call. + const ambient = filesSpanAttributesStorage.getStore(); + return this.telemetry.startActiveSpan( `files.${operation}`, { kind: SpanKind.CLIENT, attributes: { "files.operation": operation, + ...(ambient ?? {}), ...attributes, }, }, diff --git a/packages/appkit/src/plugins/files/plugin.ts b/packages/appkit/src/plugins/files/plugin.ts index 75f2e14db..13855e406 100644 --- a/packages/appkit/src/plugins/files/plugin.ts +++ b/packages/appkit/src/plugins/files/plugin.ts @@ -5,11 +5,19 @@ import type express from "express"; import type { IAppRouter, PluginExecutionSettings } from "shared"; import { contentTypeFromPath, + FILES_MAX_READ_SIZE, FilesConnector, isSafeInlineContentType, + runWithFilesSpanAttributes, validateCustomContentTypes, } from "../../connectors/files"; -import { getCurrentUserId, getWorkspaceClient } from "../../context"; +import { + getCurrentUserId, + getWorkspaceClient, + runInUserContext, + ServiceContext, + type UserContext, +} from "../../context"; import { AuthenticationError } from "../../errors"; import { createLogger } from "../../logging/logger"; import { Plugin, toPlugin } from "../../plugin"; @@ -31,7 +39,6 @@ import { policy, } from "./policy"; import type { - DownloadResponse, FilesExport, IFilesConfig, VolumeAPI, @@ -80,9 +87,33 @@ export class FilesPlugin extends Plugin { /** * Generates resource requirements dynamically from discovered + configured volumes. * Each volume key maps to a `DATABRICKS_VOLUME_{KEY_UPPERCASE}` env var. + * + * ## Per-volume permission scope (SP vs OBO) + * + * The returned manifest entries describe a single permission grant per + * volume, but the *grantee* depends on the volume's `auth` setting at + * runtime — and that distinction is **not** expressed in the manifest + * today: + * + * - **Service-principal volumes** (the default, `auth: "service-principal"`): + * the app's service principal needs `WRITE_VOLUME` (or read-equivalent) + * on the UC volume. This matches the manifest entry as written. + * - **On-behalf-of-user volumes** (`auth: "on-behalf-of-user"`): SDK calls + * execute as the **end user**, so the *user* — not the SP — must hold + * `WRITE_VOLUME` (or read-equivalent) on the UC volume. The SP only + * needs to be allowed to mint user-token requests; it does not need + * direct volume permissions. + * + * The static manifest cannot currently express this per-volume split, so + * callers configuring OBO volumes must communicate the per-user permission + * requirement out-of-band (docs, runbooks, deployment scripts) until the + * manifest schema gains a per-volume auth scope field. */ static getResourceRequirements(config: IFilesConfig): ResourceRequirement[] { const volumes = FilesPlugin.discoverVolumes(config); + // TODO: extend plugin-manifest.schema.json to express per-volume auth + // scope so OBO volumes can declare end-user-required permissions in the + // manifest itself. return Object.keys(volumes).map((key) => ({ type: ResourceType.VOLUME, alias: `volume-${key}`, @@ -100,21 +131,70 @@ export class FilesPlugin extends Plugin { } /** - * Extract user identity from the request. - * Falls back to `getCurrentUserId()` in development mode. + * Extraction for `VolumeHandle.asUser(req)`. In production we require BOTH + * `x-forwarded-user` and `x-forwarded-access-token`, and throw + * `AuthenticationError.missingToken` if either is missing — otherwise a + * request with only `x-forwarded-user: alice` would let policies see Alice + * as a "real user" (`isServicePrincipal: false`) while the SDK call below + * falls through to the SP client because `_buildUserContextOrNull` returns + * `null`. Net effect: policy approves the user, SDK runs as SP, privilege + * confusion (CWE-639/863). + * + * In development (`NODE_ENV === "development"`) we keep a local-loop + * convenience: if either header is missing we emit a single warning and + * return a policy user explicitly marked `isServicePrincipal: true`, so + * even in dev a `usersOnly`-style policy that gates on + * `!user.isServicePrincipal` cannot be tricked. The matching SDK execution + * path also falls through to the SP client (no `runInUserContext` wrap), + * so the policy user and the SDK identity stay aligned. */ private _extractUser(req: express.Request): FilePolicyUser { const userId = req.header("x-forwarded-user")?.trim(); - if (userId) return { id: userId }; + const token = req.header("x-forwarded-access-token")?.trim(); + if (userId && token) return { id: userId, isServicePrincipal: false }; if (process.env.NODE_ENV === "development") { logger.warn( - "No x-forwarded-user header — falling back to service principal identity for policy checks. " + - "Ensure your proxy forwards user headers to test per-user policies.", + "asUser(req) called without complete x-forwarded-user + x-forwarded-access-token headers — " + + "falling back to service principal identity (dev mode). " + + "In production this request would 401.", + ); + return { id: getCurrentUserId(), isServicePrincipal: true }; + } + if (!token) { + throw AuthenticationError.missingToken( + "Missing x-forwarded-access-token header for asUser(req). Both x-forwarded-user and x-forwarded-access-token are required.", + ); + } + throw AuthenticationError.missingToken( + "Missing x-forwarded-user header. Cannot resolve user ID for asUser(req).", + ); + } + + /** + * Extraction for OBO (on-behalf-of-user) volumes on the HTTP path. Both the + * `x-forwarded-access-token` and `x-forwarded-user` headers must be present + * for a valid end-user identity. When the token is missing: + * - In production we throw `AuthenticationError.missingToken` so the route + * responds with 401 (no SDK call is made). + * - In development (`NODE_ENV === "development"`) we emit a single warning + * and fall back to the service principal identity so local testing + * without a reverse proxy continues to work. + */ + private _extractObiUser(req: express.Request): FilePolicyUser { + const token = req.header("x-forwarded-access-token")?.trim(); + const userId = req.header("x-forwarded-user")?.trim(); + if (token && userId) { + return { id: userId, isServicePrincipal: false }; + } + if (!token && process.env.NODE_ENV === "development") { + logger.warn( + "OBO volume requested without x-forwarded-access-token — falling back to service principal identity (dev mode). " + + "In production this request would 401.", ); - return { id: getCurrentUserId() }; + return { id: getCurrentUserId(), isServicePrincipal: true }; } throw AuthenticationError.missingToken( - "Missing x-forwarded-user header. Cannot resolve user ID.", + "Missing x-forwarded-access-token header for on-behalf-of-user volume.", ); } @@ -152,8 +232,20 @@ export class FilesPlugin extends Plugin { /** * HTTP-level wrapper around `_checkPolicy`. - * Extracts user (401 on failure), runs policy (403 on denial). + * Selects the policy user based on the volume's auth mode (resolved via + * `_resolveAuth`): + * - `"service-principal"` (default): use the `x-forwarded-user` header when + * present, otherwise fall back to the SP identity (legacy behavior). + * - `"on-behalf-of-user"`: require `x-forwarded-access-token` (and the + * matching `x-forwarded-user`); 401 in production when missing, + * dev-fallback to SP identity in development. + * Then runs the volume policy (403 on denial, 500 on unexpected error). * Returns `true` if the request may proceed, `false` if a response was sent. + * + * NOTE: This method only selects which identity the policy sees. It does + * NOT change which `WorkspaceClient` the SDK calls execute against — that + * still uses the service principal until a later phase wires the actual + * OBO `runInUserContext` plumbing. */ private async _enforcePolicy( req: express.Request, @@ -165,10 +257,28 @@ export class FilesPlugin extends Plugin { ): Promise { let user: FilePolicyUser; try { - user = this._extractUser(req); + const auth = this._resolveAuth(volumeKey); + if (auth === "on-behalf-of-user") { + user = this._extractObiUser(req); + } else { + const headerUserId = req.header("x-forwarded-user")?.trim(); + if (headerUserId) { + user = { id: headerUserId }; + } else { + logger.debug( + "No x-forwarded-user header — proceeding with service principal identity for policy evaluation.", + ); + user = { id: getCurrentUserId(), isServicePrincipal: true }; + } + } } catch (error) { if (error instanceof AuthenticationError) { - res.status(401).json({ error: error.message, plugin: this.name }); + logger.warn( + "Authentication failed during policy evaluation for volume %s: %O", + volumeKey, + error, + ); + res.status(401).json({ error: "Unauthorized", plugin: this.name }); return false; } throw error; @@ -212,9 +322,11 @@ export class FilesPlugin extends Plugin { // Merge per-volume config with plugin-level defaults const mergedConfig: VolumeConfig = { maxUploadSize: volumeCfg.maxUploadSize ?? config.maxUploadSize, + maxReadSize: volumeCfg.maxReadSize ?? config.maxReadSize, customContentTypes: volumeCfg.customContentTypes ?? config.customContentTypes, policy: volumeCfg.policy ?? policy.publicRead(), + auth: volumeCfg.auth, }; this.volumeConfigs[key] = mergedConfig; @@ -231,6 +343,7 @@ export class FilesPlugin extends Plugin { if (!volumes[key].policy) { logger.warn( 'Volume "%s" has no explicit policy — defaulting to publicRead(). ' + + "This also matches header-less HTTP requests (which run as the service principal). " + "Set a policy in files({ volumes: { %s: { policy: ... } } }) to silence this warning.", key, key, @@ -384,6 +497,34 @@ export class FilesPlugin extends Plugin { return { connector, volumeKey }; } + /** + * Extract `req.query.path` as a single string when present. + * + * Express coerces repeated query parameters (`?path=a&path=b`) to a string + * array and dotted/nested params (`?path[k]=v`) to an object. Reject those + * with `400` instead of letting non-string values reach `_isValidPath` / + * `connector.resolvePath`, which would misbehave on arrays or objects. + * + * Returns `{ path }` (with `path` either a string or `undefined` when the + * query parameter was absent) on success. Returns `undefined` and writes a + * `400` response when the value is not a single string — callers must + * check the return for `undefined` before continuing. + */ + private _readPathQuery( + req: express.Request, + res: express.Response, + ): { path: string | undefined } | undefined { + const value = req.query.path; + if (value === undefined || typeof value === "string") { + return { path: value }; + } + res.status(400).json({ + error: "path query parameter must be a single string", + plugin: this.name, + }); + return undefined; + } + /** * Validate a file/directory path from user input. * Returns `true` if valid, or an error message string if invalid. @@ -398,38 +539,141 @@ export class FilesPlugin extends Plugin { private _readSettings( cacheKey: (string | number | object)[], + authMode: "service-principal" | "on-behalf-of-user", ): PluginExecutionSettings { + // OBO volumes: disable list/read cache. The cache layer is keyed by + // `getCurrentUserId()`, so user A's writes can only invalidate user A's + // cache entry — user B would continue to see stale data for the same + // volume/path until TTL. Disabling caching trades read performance for + // correctness; the alternative is a per-(volume, path) generation + // counter folded into the cache key on writes (a future enhancement). + const isObo = authMode === "on-behalf-of-user"; + const cache = isObo + ? { ...FILES_READ_DEFAULTS.cache, enabled: false, cacheKey } + : { ...FILES_READ_DEFAULTS.cache, cacheKey }; return { default: { ...FILES_READ_DEFAULTS, - cache: { ...FILES_READ_DEFAULTS.cache, cacheKey }, + cache, + telemetryInterceptor: { + attributes: this._authModeAttributes(authMode), + }, + }, + }; + } + + private _writeSettings( + authMode: "service-principal" | "on-behalf-of-user", + ): PluginExecutionSettings { + return { + default: { + ...FILES_WRITE_DEFAULTS, + telemetryInterceptor: { + attributes: this._authModeAttributes(authMode), + }, + }, + }; + } + + private _downloadSettings( + authMode: "service-principal" | "on-behalf-of-user", + ): PluginExecutionSettings { + return { + default: { + ...FILES_DOWNLOAD_DEFAULTS, + telemetryInterceptor: { + attributes: this._authModeAttributes(authMode), + }, }, }; } /** * Invalidate cached list entries for a directory after a write operation. - * Uses the same cache-key format as `_handleList`: resolved path for - * subdirectories, `"__root__"` for the volume root. + * Must produce the SAME cache-key shape that `_handleList` stored under. + * `_handleList` builds its key from `req.query.path`: when `path` is + * provided it uses `connector.resolvePath(path)`, otherwise it uses the + * sentinel `"__root__"`. The invalidation here must derive the matching + * directory from the FILE path being written: + * + * - `"/Volumes/c/s/v/foo/bar.txt"` → `parentDirectory` returns + * `"/Volumes/c/s/v/foo"` → resolved path key. + * - `"/bar.txt"` and `"bar.txt"` → root-level files: matching list cache + * was a rootless `list()` call → `"__root__"` sentinel. + * + * On OBO volumes the read cache is disabled (see `_readSettings`), so + * invalidation is a no-op here for `mode === "on-behalf-of-user"`. The + * cache layer is keyed by `getCurrentUserId()`, so user A's writes can + * only invalidate user A's cache entry — user B would otherwise see stale + * data for the same volume/path until TTL. Disabling the cache on OBO + * trades read performance for correctness; the alternative is a + * per-(volume, path) generation counter folded into the cache key on + * writes (a future enhancement). * - * Cache keys include `getCurrentUserId()` — must match the identity used - * by `this.execute()` in `_handleList`. Both run in service-principal - * context; wrapping either in `runInUserContext` would break invalidation. + * Best-effort: a thrown `connector.resolvePath` (e.g. on malformed input) + * is swallowed here. Invalidation is purely an optimization signal — a + * missed delete only costs read freshness, not correctness, and + * propagating the error would convert a successful write into an HTTP + * 500. + * + * Returns a `Promise`; callers MUST `await` this before sending the + * HTTP success response so a follow-up `GET /list` issued in the same tick + * cannot race the underlying `cache.delete()` and observe stale data. */ - private _invalidateListCache( + private async _invalidateListCache( volumeKey: string, - parentPath: string, + writtenPath: string, connector: FilesConnector, - ): void { - const parent = parentDirectory(parentPath); - const cachePathSegment = parent - ? connector.resolvePath(parent) - : "__root__"; - const listKey = this.cache.generateKey( - [`files:${volumeKey}:list`, cachePathSegment], - getCurrentUserId(), - ); - this.cache.delete(listKey); + mode: "service-principal" | "on-behalf-of-user" = "service-principal", + ): Promise { + if (mode === "on-behalf-of-user") { + // OBO read cache is disabled — nothing to invalidate. Skipping here + // also prevents accidentally caching a wrong-namespace delete that + // would mask the missing cross-user invalidation if the cache were + // ever re-enabled. + return; + } + const parent = parentDirectory(writtenPath); + // The list cache stored under `"__root__"` whenever the matching read + // came from a rootless `list()` (no `?path=`). `parentDirectory` + // returns `"/"` for root-level files like `/bar.txt`, and `""` for + // relative root-level files like `bar.txt`. Both map to the sentinel. + const isRootLevel = !parent || parent === "/"; + const userKey = getCurrentUserId(); + const tryDelete = async (segment: string): Promise => { + try { + await this.cache.delete( + this.cache.generateKey([`files:${volumeKey}:list`, segment], userKey), + ); + } catch (err) { + logger.debug( + "List-cache invalidation failed for volume=%s segment=%s: %O", + volumeKey, + segment, + err, + ); + } + }; + + if (isRootLevel) { + // A rootless `list()` produced the `"__root__"` key. + await tryDelete("__root__"); + return; + } + + let resolved: string; + try { + resolved = connector.resolvePath(parent); + } catch (err) { + logger.debug( + "List-cache invalidation: resolvePath(%s) failed for volume=%s: %O", + parent, + volumeKey, + err, + ); + return; + } + await tryDelete(resolved); } private _handleApiError( @@ -445,17 +689,19 @@ export class FilesPlugin extends Plugin { return; } if (error instanceof AuthenticationError) { - res.status(401).json({ - error: error.message, - plugin: this.name, - }); + logger.warn("Authentication failed in %s: %O", this.name, error); + res.status(401).json({ error: "Unauthorized", plugin: this.name }); return; } if (error instanceof ApiError) { const status = error.statusCode ?? 500; if (status >= 400 && status < 500) { + // Don't reflect raw SDK error.message — it can leak internal volume + // paths, hostnames, or principal names. Use the standard HTTP status + // text for the public body and log the full error server-side. + logger.warn("Upstream %d in %s: %O", status, this.name, error); res.status(status).json({ - error: error.message, + error: STATUS_CODES[status] ?? "Client Error", statusCode: status, plugin: this.name, }); @@ -482,28 +728,36 @@ export class FilesPlugin extends Plugin { connector: FilesConnector, volumeKey: string, ): Promise { - const path = req.query.path as string | undefined; + const query = this._readPathQuery(req, res); + if (!query) return; + const path = query.path; if (!(await this._enforcePolicy(req, res, volumeKey, "list", path ?? "/"))) return; - try { - const result = await this.execute( - async () => connector.list(getWorkspaceClient(), path), - this._readSettings([ - `files:${volumeKey}:list`, - path ? connector.resolvePath(path) : "__root__", - ]), - ); + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + await this._runWithAuth(userCtx, async () => { + try { + const result = await this.execute( + async () => connector.list(getWorkspaceClient(), path), + this._readSettings( + [ + `files:${volumeKey}:list`, + path ? connector.resolvePath(path) : "__root__", + ], + mode, + ), + ); - if (!result.ok) { - this._sendStatusError(res, result.status); - return; + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.json(result.data); + } catch (error) { + this._handleApiError(res, error, "List failed"); } - res.json(result.data); - } catch (error) { - this._handleApiError(res, error, "List failed"); - } + }); } private async _handleRead( @@ -512,33 +766,89 @@ export class FilesPlugin extends Plugin { connector: FilesConnector, volumeKey: string, ): Promise { - const path = req.query.path as string; + const query = this._readPathQuery(req, res); + if (!query) return; + const rawPath = query.path; - const valid = this._isValidPath(path); + const valid = this._isValidPath(rawPath); if (valid !== true) { res.status(400).json({ error: valid, plugin: this.name }); return; } + const path = rawPath as string; if (!(await this._enforcePolicy(req, res, volumeKey, "read", path))) return; - try { - const result = await this.execute( - async () => connector.read(getWorkspaceClient(), path), - this._readSettings([ - `files:${volumeKey}:read`, - connector.resolvePath(path), - ]), - ); + const volumeCfg = this.volumeConfigs[volumeKey]; + const maxReadSize = volumeCfg.maxReadSize ?? FILES_MAX_READ_SIZE; + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + + await this._runWithAuth(userCtx, async () => { + try { + // Stream the file body, capping at `maxReadSize` to avoid buffering + // arbitrary-size files. Uses download-tier settings (no cache) — + // `/read` no longer participates in the read-tier cache. Programmatic + // callers wanting a cached small-file read should use the SDK + // `volume.read(path, { maxSize })` method directly. + const response = await this.execute( + async () => connector.download(getWorkspaceClient(), path), + this._downloadSettings(mode), + ); + if (!response.ok) { + this._sendStatusError(res, response.status); + return; + } + if (!response.data.contents) { + res.type("text/plain").send(""); + return; + } - if (!result.ok) { - this._sendStatusError(res, result.status); - return; + let bytesSent = 0; + const limited = response.data.contents.pipeThrough( + new TransformStream({ + transform(chunk, controller) { + bytesSent += chunk.byteLength; + if (bytesSent > maxReadSize) { + controller.error( + new Error( + `File exceeds maxReadSize (${maxReadSize} bytes). Use /download for large files.`, + ), + ); + return; + } + controller.enqueue(chunk); + }, + }), + ); + + res.type("text/plain"); + const nodeStream = Readable.fromWeb( + limited as import("node:stream/web").ReadableStream, + ); + nodeStream.on("error", (err) => { + if ( + err instanceof Error && + err.message.includes("exceeds maxReadSize") + ) { + if (!res.headersSent) { + res.status(413).json({ error: err.message, plugin: this.name }); + return; + } + res.destroy(err); + return; + } + logger.error("Stream error during read: %O", err); + if (!res.headersSent) { + this._sendStatusError(res, 500); + } else { + res.destroy(); + } + }); + nodeStream.pipe(res); + } catch (error) { + this._handleApiError(res, error, "Read failed"); } - res.type("text/plain").send(result.data); - } catch (error) { - this._handleApiError(res, error, "Read failed"); - } + }); } private async _handleDownload( @@ -575,78 +885,82 @@ export class FilesPlugin extends Plugin { volumeKey: string, opts: { mode: "download" | "raw" }, ): Promise { - const path = req.query.path as string; + const query = this._readPathQuery(req, res); + if (!query) return; + const rawPath = query.path; - const valid = this._isValidPath(path); + const valid = this._isValidPath(rawPath); if (valid !== true) { res.status(400).json({ error: valid, plugin: this.name }); return; } + const path = rawPath as string; if (!(await this._enforcePolicy(req, res, volumeKey, opts.mode, path))) return; const label = opts.mode === "download" ? "Download" : "Raw fetch"; const volumeCfg = this.volumeConfigs[volumeKey]; + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + + await this._runWithAuth(userCtx, async () => { + try { + const settings = this._downloadSettings(mode); + const response = await this.execute( + async () => connector.download(getWorkspaceClient(), path), + settings, + ); - try { - const settings: PluginExecutionSettings = { - default: FILES_DOWNLOAD_DEFAULTS, - }; - const response = await this.execute( - async () => connector.download(getWorkspaceClient(), path), - settings, - ); - - if (!response.ok) { - this._sendStatusError(res, response.status); - return; - } - - const resolvedType = contentTypeFromPath( - path, - undefined, - volumeCfg.customContentTypes, - ); - const fileName = sanitizeFilename(path.split("/").pop() ?? "download"); - - res.setHeader("Content-Type", resolvedType); - res.setHeader("X-Content-Type-Options", "nosniff"); + if (!response.ok) { + this._sendStatusError(res, response.status); + return; + } - if (opts.mode === "raw") { - res.setHeader("Content-Security-Policy", "sandbox"); - if (!isSafeInlineContentType(resolvedType)) { + const resolvedType = contentTypeFromPath( + path, + undefined, + volumeCfg.customContentTypes, + ); + const fileName = sanitizeFilename(path.split("/").pop() ?? "download"); + + res.setHeader("Content-Type", resolvedType); + res.setHeader("X-Content-Type-Options", "nosniff"); + + if (opts.mode === "raw") { + res.setHeader("Content-Security-Policy", "sandbox"); + if (!isSafeInlineContentType(resolvedType)) { + res.setHeader( + "Content-Disposition", + `attachment; filename="${fileName}"`, + ); + } + } else { res.setHeader( "Content-Disposition", `attachment; filename="${fileName}"`, ); } - } else { - res.setHeader( - "Content-Disposition", - `attachment; filename="${fileName}"`, - ); - } - if (response.data.contents) { - const nodeStream = Readable.fromWeb( - response.data.contents as import("node:stream/web").ReadableStream, - ); - nodeStream.on("error", (err) => { - logger.error("Stream error during %s: %O", opts.mode, err); - if (!res.headersSent) { - this._sendStatusError(res, 500); - } else { - res.destroy(); - } - }); - nodeStream.pipe(res); - } else { - res.end(); + if (response.data.contents) { + const nodeStream = Readable.fromWeb( + response.data.contents as import("node:stream/web").ReadableStream, + ); + nodeStream.on("error", (err) => { + logger.error("Stream error during %s: %O", opts.mode, err); + if (!res.headersSent) { + this._sendStatusError(res, 500); + } else { + res.destroy(); + } + }); + nodeStream.pipe(res); + } else { + res.end(); + } + } catch (error) { + this._handleApiError(res, error, `${label} failed`); } - } catch (error) { - this._handleApiError(res, error, `${label} failed`); - } + }); } private async _handleExists( @@ -655,34 +969,40 @@ export class FilesPlugin extends Plugin { connector: FilesConnector, volumeKey: string, ): Promise { - const path = req.query.path as string; + const query = this._readPathQuery(req, res); + if (!query) return; + const rawPath = query.path; - const valid = this._isValidPath(path); + const valid = this._isValidPath(rawPath); if (valid !== true) { res.status(400).json({ error: valid, plugin: this.name }); return; } + const path = rawPath as string; if (!(await this._enforcePolicy(req, res, volumeKey, "exists", path))) return; - try { - const result = await this.execute( - async () => connector.exists(getWorkspaceClient(), path), - this._readSettings([ - `files:${volumeKey}:exists`, - connector.resolvePath(path), - ]), - ); + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + await this._runWithAuth(userCtx, async () => { + try { + const result = await this.execute( + async () => connector.exists(getWorkspaceClient(), path), + this._readSettings( + [`files:${volumeKey}:exists`, connector.resolvePath(path)], + mode, + ), + ); - if (!result.ok) { - this._sendStatusError(res, result.status); - return; + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.json({ exists: result.data }); + } catch (error) { + this._handleApiError(res, error, "Exists check failed"); } - res.json({ exists: result.data }); - } catch (error) { - this._handleApiError(res, error, "Exists check failed"); - } + }); } private async _handleMetadata( @@ -691,34 +1011,40 @@ export class FilesPlugin extends Plugin { connector: FilesConnector, volumeKey: string, ): Promise { - const path = req.query.path as string; + const query = this._readPathQuery(req, res); + if (!query) return; + const rawPath = query.path; - const valid = this._isValidPath(path); + const valid = this._isValidPath(rawPath); if (valid !== true) { res.status(400).json({ error: valid, plugin: this.name }); return; } + const path = rawPath as string; if (!(await this._enforcePolicy(req, res, volumeKey, "metadata", path))) return; - try { - const result = await this.execute( - async () => connector.metadata(getWorkspaceClient(), path), - this._readSettings([ - `files:${volumeKey}:metadata`, - connector.resolvePath(path), - ]), - ); + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + await this._runWithAuth(userCtx, async () => { + try { + const result = await this.execute( + async () => connector.metadata(getWorkspaceClient(), path), + this._readSettings( + [`files:${volumeKey}:metadata`, connector.resolvePath(path)], + mode, + ), + ); - if (!result.ok) { - this._sendStatusError(res, result.status); - return; + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.json(result.data); + } catch (error) { + this._handleApiError(res, error, "Metadata fetch failed"); } - res.json(result.data); - } catch (error) { - this._handleApiError(res, error, "Metadata fetch failed"); - } + }); } private async _handlePreview( @@ -727,34 +1053,40 @@ export class FilesPlugin extends Plugin { connector: FilesConnector, volumeKey: string, ): Promise { - const path = req.query.path as string; + const query = this._readPathQuery(req, res); + if (!query) return; + const rawPath = query.path; - const valid = this._isValidPath(path); + const valid = this._isValidPath(rawPath); if (valid !== true) { res.status(400).json({ error: valid, plugin: this.name }); return; } + const path = rawPath as string; if (!(await this._enforcePolicy(req, res, volumeKey, "preview", path))) return; - try { - const result = await this.execute( - async () => connector.preview(getWorkspaceClient(), path), - this._readSettings([ - `files:${volumeKey}:preview`, - connector.resolvePath(path), - ]), - ); + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + await this._runWithAuth(userCtx, async () => { + try { + const result = await this.execute( + async () => connector.preview(getWorkspaceClient(), path), + this._readSettings( + [`files:${volumeKey}:preview`, connector.resolvePath(path)], + mode, + ), + ); - if (!result.ok) { - this._sendStatusError(res, result.status); - return; + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.json(result.data); + } catch (error) { + this._handleApiError(res, error, "Preview failed"); } - res.json(result.data); - } catch (error) { - this._handleApiError(res, error, "Preview failed"); - } + }); } private async _handleUpload( @@ -763,12 +1095,15 @@ export class FilesPlugin extends Plugin { connector: FilesConnector, volumeKey: string, ): Promise { - const path = req.query.path as string; - const valid = this._isValidPath(path); + const query = this._readPathQuery(req, res); + if (!query) return; + const rawPath = query.path; + const valid = this._isValidPath(rawPath); if (valid !== true) { res.status(400).json({ error: valid, plugin: this.name }); return; } + const path = rawPath as string; const volumeCfg = this.volumeConfigs[volumeKey]; const maxSize = volumeCfg.maxUploadSize ?? FILES_MAX_UPLOAD_SIZE; @@ -803,70 +1138,101 @@ export class FilesPlugin extends Plugin { logger.debug(req, "Upload started: volume=%s path=%s", volumeKey, path); - try { - const rawStream: ReadableStream = Readable.toWeb(req); - - let bytesReceived = 0; - const webStream = rawStream.pipeThrough( - new TransformStream({ - transform(chunk, controller) { - bytesReceived += chunk.byteLength; - if (bytesReceived > maxSize) { - controller.error( - new Error( - `Upload stream exceeds maximum allowed size (${maxSize} bytes)`, - ), - ); - return; - } - controller.enqueue(chunk); - }, - }), - ); - - logger.debug( - req, - "Upload body received: volume=%s path=%s, size=%d bytes", - volumeKey, - path, - contentLength ?? 0, - ); - const settings: PluginExecutionSettings = { - default: FILES_WRITE_DEFAULTS, - }; - const result = await this.trackWrite(() => - this.execute(async () => { - await connector.upload(getWorkspaceClient(), path, webStream); - return { success: true as const }; - }, settings), - ); - - this._invalidateListCache(volumeKey, path, connector); + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + await this._runWithAuth(userCtx, async () => { + try { + const rawStream: ReadableStream = Readable.toWeb(req); + + let bytesReceived = 0; + const webStream = rawStream.pipeThrough( + new TransformStream({ + transform(chunk, controller) { + bytesReceived += chunk.byteLength; + if (bytesReceived > maxSize) { + controller.error( + new Error( + `Upload stream exceeds maximum allowed size (${maxSize} bytes)`, + ), + ); + return; + } + // When the client declared a Content-Length, the policy was + // gated on that value (lines above pass `size: contentLength` + // to `_enforcePolicy`). Refuse bytes beyond the declared size + // so an attacker cannot bypass a per-user policy by sending a + // small Content-Length and then streaming up to maxSize. + if ( + contentLength !== undefined && + bytesReceived > contentLength + ) { + controller.error( + new Error( + `Upload stream exceeds declared Content-Length (${contentLength} bytes)`, + ), + ); + return; + } + controller.enqueue(chunk); + }, + }), + ); - if (!result.ok) { - logger.error( + logger.debug( req, - "Upload failed: volume=%s path=%s, size=%d bytes", + "Upload body received: volume=%s path=%s, size=%d bytes", volumeKey, path, contentLength ?? 0, ); - this._sendStatusError(res, result.status); - return; - } + const settings = this._writeSettings(mode); + // The connector's `upload` resolves `getWorkspaceClient()` and + // `client.config.authenticate(headers)` synchronously inside this + // callback. When `_runWithAuth` wraps us in `runInUserContext`, that + // chain produces user-token Authorization headers on the outgoing + // `fetch PUT`. The OBO upload-headers test pins this contract. + const result = await this.trackWrite(() => + this.execute(async () => { + await connector.upload(getWorkspaceClient(), path, webStream); + return { success: true as const }; + }, settings), + ); - logger.debug(req, "Upload complete: volume=%s path=%s", volumeKey, path); - res.json(result.data); - } catch (error) { - if ( - error instanceof Error && - error.message.includes("exceeds maximum allowed size") - ) { - res.status(413).json({ error: error.message, plugin: this.name }); - return; + // Awaited before sending the response so that a follow-up + // `GET /list` issued in the same tick cannot race the + // underlying `cache.delete()` and observe pre-write data. + await this._invalidateListCache(volumeKey, path, connector, mode); + + if (!result.ok) { + logger.error( + req, + "Upload failed: volume=%s path=%s, size=%d bytes", + volumeKey, + path, + contentLength ?? 0, + ); + this._sendStatusError(res, result.status); + return; + } + + logger.debug( + req, + "Upload complete: volume=%s path=%s", + volumeKey, + path, + ); + res.json(result.data); + } catch (error) { + if ( + error instanceof Error && + (error.message.includes("exceeds maximum allowed size") || + error.message.includes("exceeds declared Content-Length")) + ) { + res.status(413).json({ error: error.message, plugin: this.name }); + return; + } + this._handleApiError(res, error, "Upload failed"); } - this._handleApiError(res, error, "Upload failed"); - } + }); } private async _handleMkdir( @@ -887,28 +1253,32 @@ export class FilesPlugin extends Plugin { if (!(await this._enforcePolicy(req, res, volumeKey, "mkdir", dirPath))) return; - try { - const settings: PluginExecutionSettings = { - default: FILES_WRITE_DEFAULTS, - }; - const result = await this.trackWrite(() => - this.execute(async () => { - await connector.createDirectory(getWorkspaceClient(), dirPath); - return { success: true as const }; - }, settings), - ); + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + await this._runWithAuth(userCtx, async () => { + try { + const settings = this._writeSettings(mode); + const result = await this.trackWrite(() => + this.execute(async () => { + await connector.createDirectory(getWorkspaceClient(), dirPath); + return { success: true as const }; + }, settings), + ); - this._invalidateListCache(volumeKey, dirPath, connector); + // Awaited before sending the response so that a follow-up + // `GET /list` issued in the same tick cannot race the + // underlying `cache.delete()` and observe pre-write data. + await this._invalidateListCache(volumeKey, dirPath, connector, mode); - if (!result.ok) { - this._sendStatusError(res, result.status); - return; - } + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } - res.json(result.data); - } catch (error) { - this._handleApiError(res, error, "Create directory failed"); - } + res.json(result.data); + } catch (error) { + this._handleApiError(res, error, "Create directory failed"); + } + }); } private async _handleDelete( @@ -917,7 +1287,9 @@ export class FilesPlugin extends Plugin { connector: FilesConnector, volumeKey: string, ): Promise { - const rawPath = req.query.path as string | undefined; + const query = this._readPathQuery(req, res); + if (!query) return; + const rawPath = query.path; const valid = this._isValidPath(rawPath); if (valid !== true) { @@ -929,51 +1301,240 @@ export class FilesPlugin extends Plugin { if (!(await this._enforcePolicy(req, res, volumeKey, "delete", path))) return; - try { - const settings: PluginExecutionSettings = { - default: FILES_WRITE_DEFAULTS, - }; - const result = await this.trackWrite(() => - this.execute(async () => { - await connector.delete(getWorkspaceClient(), path); - return { success: true as const }; - }, settings), - ); + const { mode, userCtx } = this._resolveAuthForRequest(req, volumeKey); + await this._runWithAuth(userCtx, async () => { + try { + const settings = this._writeSettings(mode); + const result = await this.trackWrite(() => + this.execute(async () => { + await connector.delete(getWorkspaceClient(), path); + return { success: true as const }; + }, settings), + ); - this._invalidateListCache(volumeKey, path, connector); + // Awaited before sending the response so that a follow-up + // `GET /list` issued in the same tick cannot race the + // underlying `cache.delete()` and observe pre-write data. + await this._invalidateListCache(volumeKey, path, connector, mode); - if (!result.ok) { - this._sendStatusError(res, result.status); - return; + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + + res.json(result.data); + } catch (error) { + this._handleApiError(res, error, "Delete failed"); } + }); + } - res.json(result.data); - } catch (error) { - this._handleApiError(res, error, "Delete failed"); + private _resolveAuth( + volumeKey: string, + ): "service-principal" | "on-behalf-of-user" { + return ( + this.volumeConfigs[volumeKey]?.auth ?? + this.config.auth ?? + "service-principal" + ); + } + + /** + * Build a `UserContext` from request headers when both + * `x-forwarded-access-token` and `x-forwarded-user` are present, otherwise + * return `null`. Used by OBO route handlers to wrap SDK calls in the + * end-user's identity. A `null` result means "fall back to the service + * principal client" — for OBO volumes in production, `_enforcePolicy` will + * already have responded 401 before we get here, so `null` is reachable + * only on the dev-fallback path. + */ + private _buildUserContextOrNull(req: express.Request): UserContext | null { + const token = req.header("x-forwarded-access-token")?.trim(); + const userId = req.header("x-forwarded-user")?.trim(); + if (!token || !userId) return null; + return ServiceContext.createUserContext(token, userId); + } + + /** + * Build the telemetry attribute hash for the `files.auth_mode` span + * attribute. The value reflects what operationally happened — i.e. + * whether `runInUserContext` actually wrapped the SDK call: + * - HTTP route on OBO volume + valid token → `"on-behalf-of-user"`. + * - HTTP route on OBO volume + dev-fallback (no token) → + * `"service-principal"` (the route falls through to the SP client). + * - HTTP route on SP volume → `"service-principal"`. + * - `asUser(req)` programmatic calls with a real user context → + * `"on-behalf-of-user"`. + * - Any unwrapped path → `"service-principal"`. + */ + private _authModeAttributes( + authMode: "service-principal" | "on-behalf-of-user", + ): { "files.auth_mode": string } { + return { "files.auth_mode": authMode }; + } + + /** + * One-shot resolver for HTTP route handlers. Builds the request's + * `UserContext` AT MOST ONCE (when the volume is OBO and the headers are + * present) and returns both the operationally-effective auth mode and the + * pre-built `UserContext`. + * + * Handlers thread the `userCtx` into `_runWithAuth(userCtx, fn)` to avoid + * a second `ServiceContext.createUserContext()` allocation — that call + * builds a fresh `WorkspaceClient` per invocation, so doing it twice per + * request was pure throwaway overhead. + */ + private _resolveAuthForRequest( + req: express.Request, + volumeKey: string, + ): { + mode: "service-principal" | "on-behalf-of-user"; + userCtx: UserContext | null; + } { + if (this._resolveAuth(volumeKey) !== "on-behalf-of-user") { + return { mode: "service-principal", userCtx: null }; } + const userCtx = this._buildUserContextOrNull(req); + return userCtx + ? { mode: "on-behalf-of-user", userCtx } + : { mode: "service-principal", userCtx: null }; } /** - * Creates a VolumeAPI for a specific volume key. + * Run `fn` under the correct execution context. + * - `userCtx` is `null`: invokes `fn` directly so the service-principal + * `WorkspaceClient` and `getCurrentUserId()` are used — identical + * behavior to pre-OBO releases. This covers both SP volumes and the + * OBO dev-fallback path (where headers were missing). + * - `userCtx` is a `UserContext`: wraps `fn` in `runInUserContext(userCtx)`, + * so SDK calls execute as the end user and `getCurrentUserId()` (and + * therefore cache keys) resolve to the user's ID. + * + * The caller is responsible for building `userCtx` exactly once per + * request via `_resolveAuthForRequest`; this signature deliberately does + * NOT take a `req` so it cannot accidentally re-build the context. + */ + private async _runWithAuth( + userCtx: UserContext | null, + fn: () => Promise, + ): Promise { + if (userCtx) { + return runInUserContext(userCtx, fn); + } + return fn(); + } + + /** + * Tag the span that `FilesConnector.` opens with + * `files.auth_mode`. Programmatic VolumeAPI methods bypass + * `this.execute(...)` (and therefore the `TelemetryInterceptor`), so the + * connector's own `files.` span is the natural place to land + * this attribute. Rather than opening a parent `files.` span + * (which would duplicate the connector's span — same name, doubled + * allocation/export), we propagate the attribute via AsyncLocalStorage + * and let the connector merge it into its existing span at creation + * time. * - * By default, enforces the volume's policy before each operation. - * Pass `bypassPolicy: true` to skip policy checks — useful for - * background jobs or migrations that should bypass user-facing policies. + * The `operation` parameter is unused by the propagation mechanism (the + * connector knows its own operation), but kept in the signature for API + * stability with the previous span-creation form. + */ + private _withAuthModeAttributes( + _operation: string, + authMode: "service-principal" | "on-behalf-of-user", + fn: () => Promise, + ): Promise { + return runWithFilesSpanAttributes(this._authModeAttributes(authMode), fn); + } + + /** + * Wrap each `VolumeAPI` method so the `FilesConnector` span it produces is + * tagged with `files.auth_mode = "service-principal"`. Used for + * programmatic calls that don't go through `asUser(req)`. * - * @security When `bypassPolicy` is `true`, no policy enforcement runs. - * Do not expose bypassed APIs to HTTP routes or end-user code paths. + * The attribute is attached to the connector's existing span via + * AsyncLocalStorage propagation (see `_withAuthModeAttributes`); no + * additional parent span is opened, so each call produces exactly one + * `files.` span instead of two. + */ + private _wrapVolumeAPIWithSPSpan(api: VolumeAPI): VolumeAPI { + const wrap = + ( + operation: string, + fn: (...args: Args) => Promise, + ): ((...args: Args) => Promise) => + (...args: Args) => + this._withAuthModeAttributes(operation, "service-principal", () => + fn(...args), + ); + + return { + list: wrap("list", api.list), + read: wrap("read", api.read), + download: wrap("download", api.download), + exists: wrap("exists", api.exists), + metadata: wrap("metadata", api.metadata), + upload: wrap("upload", api.upload), + createDirectory: wrap("createDirectory", api.createDirectory), + delete: wrap("delete", api.delete), + preview: wrap("preview", api.preview), + }; + } + + /** + * Wrap each `VolumeAPI` method so its execution runs inside + * `runInUserContext(userCtx, ...)`. Used by `VolumeHandle.asUser(req)` to + * force the SDK identity to the end user regardless of the volume's + * `auth` setting. The policy check baked into each method (via + * `createVolumeAPI`) runs inside the same scope, so `getCurrentUserId()` + * and any cache `userKey` derived from it also resolve to the user. + * + * Each wrapped invocation tags the connector's span with + * `files.auth_mode = "on-behalf-of-user"` via AsyncLocalStorage + * propagation — no additional parent span is opened. + */ + private _wrapVolumeAPIInUserContext( + api: VolumeAPI, + userCtx: UserContext, + ): VolumeAPI { + const wrap = + ( + operation: string, + fn: (...args: Args) => Promise, + ): ((...args: Args) => Promise) => + (...args: Args) => + this._withAuthModeAttributes(operation, "on-behalf-of-user", () => + runInUserContext(userCtx, () => fn(...args)), + ); + + return { + list: wrap("list", api.list), + read: wrap("read", api.read), + download: wrap("download", api.download), + exists: wrap("exists", api.exists), + metadata: wrap("metadata", api.metadata), + upload: wrap("upload", api.upload), + createDirectory: wrap("createDirectory", api.createDirectory), + delete: wrap("delete", api.delete), + preview: wrap("preview", api.preview), + }; + } + + /** + * Creates a VolumeAPI for a specific volume key. + * + * Enforces the volume's policy before each operation. */ protected createVolumeAPI( volumeKey: string, user: FilePolicyUser, - options?: { bypassPolicy?: boolean }, ): VolumeAPI { const connector = this.volumeConnectors[volumeKey]; - const noop = () => Promise.resolve(); - const check = options?.bypassPolicy - ? noop - : (action: FileAction, path: string, overrides?: Partial) => - this._checkPolicy(volumeKey, action, path, user, overrides); + const check = ( + action: FileAction, + path: string, + overrides?: Partial, + ) => this._checkPolicy(volumeKey, action, path, user, overrides); return { list: async (directoryPath?: string) => { @@ -1051,15 +1612,22 @@ export class FilesPlugin extends Plugin { * Returns the programmatic API for the Files plugin. * Callable with a volume key to get a volume-scoped handle. * - * All operations execute as the service principal. - * Use policies to control per-user access. + * SP volumes (`auth: "service-principal"`, the default) execute as the + * service principal. OBO volumes (`auth: "on-behalf-of-user"`) executed + * through the HTTP routes run as the end user; for programmatic calls + * outside a route, use `asUser(req)` to opt into per-user execution. + * `asUser(req)` is a hard override at the SDK level: it forces every + * subsequent call to execute as the end user inside `runInUserContext`, + * regardless of the volume's `auth` setting. Policies control per-user + * access in either mode. * * @example * ```ts * // Service principal access * appKit.files("uploads").list() * - * // With policy: pass user identity for access control + * // With policy: pass user identity for access control. The SDK call + * // also executes as the user (not the service principal). * appKit.files("uploads").asUser(req).list() * ``` */ @@ -1079,13 +1647,28 @@ export class FilesPlugin extends Plugin { }, isServicePrincipal: true, }; - const spApi = this.createVolumeAPI(volumeKey, spUser); + // Default (non-asUser) programmatic surface: every call is tagged + // with `files.auth_mode = "service-principal"` for telemetry parity + // with the HTTP route path. + const spApi = this._wrapVolumeAPIWithSPSpan( + this.createVolumeAPI(volumeKey, spUser), + ); return { ...spApi, asUser: (req: express.Request) => { const user = this._extractUser(req); - return this.createVolumeAPI(volumeKey, user); + const api = this.createVolumeAPI(volumeKey, user); + // Force OBO at the SDK level regardless of the volume's `auth` + // setting: each method runs inside `runInUserContext` so + // `getWorkspaceClient()` returns the user-token client. When no + // user token is available (only reachable in dev mode after the + // strict `_extractUser` falls back to the SP identity), we skip + // the OBO wrap and instead apply the SP-mode span wrap so trace + // output reflects the actual SP execution. + const userCtx = this._buildUserContextOrNull(req); + if (!userCtx) return this._wrapVolumeAPIWithSPSpan(api); + return this._wrapVolumeAPIInUserContext(api, userCtx); }, }; }; diff --git a/packages/appkit/src/plugins/files/policy.ts b/packages/appkit/src/plugins/files/policy.ts index 87b23f377..be231891b 100644 --- a/packages/appkit/src/plugins/files/policy.ts +++ b/packages/appkit/src/plugins/files/policy.ts @@ -57,8 +57,37 @@ export interface FileResource { /** Minimal user identity passed to the policy function. */ export interface FilePolicyUser { + /** + * Identifier of the requesting caller. For end-user HTTP requests this is + * the value of the `x-forwarded-user` header; for direct SDK calls and + * header-less HTTP requests (which run as the service principal), this is + * the service principal's ID. + */ id: string; - /** `true` when the caller is the service principal (direct SDK call, not `asUser`). */ + /** + * `true` when the call is executing as the service principal — either a + * direct SDK call (`appKit.files(...)`) or an HTTP request that arrived + * without an `x-forwarded-user` / `x-forwarded-access-token` header. + * Policy authors typically check this first to distinguish SP traffic + * from end-user traffic. + * + * The flag reflects the **policy user** the plugin selects, which + * combines the volume's effective `auth` mode with the headers on the + * incoming request. The full matrix: + * + * | Volume `auth` | Path | Headers | `isServicePrincipal` | Notes | + * | --------------------- | ------------------------------ | ----------------------------- | -------------------- | ---------------------------------------------------------------------------------------------- | + * | `service-principal` | HTTP | `x-forwarded-user` present | `false` (or unset) | Pre-OBO behavior. Policy sees the end user but the SDK call still runs as the SP. | + * | `service-principal` | HTTP | no `x-forwarded-user` | `true` | Headerless request — policy and SDK both run as the SP. | + * | `on-behalf-of-user` | HTTP | valid token + user header | `false` | Real end-user execution. Policy sees the user; the SDK call also runs as the user. | + * | `on-behalf-of-user` | HTTP | missing token, dev-fallback | `true` | Only reachable when `NODE_ENV === "development"` (prod returns 401). Treated as SP traffic. | + * | any | Programmatic `asUser(req)` | `x-forwarded-user` present | `false` | `asUser` extracts the user; the SDK call runs as the user inside `runInUserContext`. | + * + * Programmatic calls without `asUser(req)` always set + * `isServicePrincipal: true` because no request is available to derive a + * user identity from. OBO volume defaults apply only to HTTP route + * traffic; for programmatic per-user execution, use `asUser(req)`. + */ isServicePrincipal?: boolean; } diff --git a/packages/appkit/src/plugins/files/tests/plugin.integration.test.ts b/packages/appkit/src/plugins/files/tests/plugin.integration.test.ts index 3c8ff74ec..5c3038502 100644 --- a/packages/appkit/src/plugins/files/tests/plugin.integration.test.ts +++ b/packages/appkit/src/plugins/files/tests/plugin.integration.test.ts @@ -65,11 +65,33 @@ const MOCK_AUTH_HEADERS = { /** Volume key used in all integration tests. */ const VOL = "files"; +/** + * Wait for the supplied server to finish binding, then return the + * OS-assigned port. Required when tests pass `port: 0` to `serverPlugin` + * — `createApp` resolves as soon as `listen()` is invoked but before the + * bind completes, so `server.address()` returns `null` until the + * `listening` event fires. + */ +async function getListeningPort(server: Server): Promise { + const addr = server.address(); + if (addr && typeof addr === "object" && typeof addr.port === "number") { + return addr.port; + } + await new Promise((resolve, reject) => { + server.once("listening", () => resolve()); + server.once("error", (err) => reject(err)); + }); + const ready = server.address(); + if (!ready || typeof ready !== "object") { + throw new Error("Server is listening but address() returned null"); + } + return ready.port; +} + describe("Files Plugin Integration", () => { let server: Server; let baseUrl: string; let serviceContextMock: Awaited>; - const TEST_PORT = 9880; beforeAll(async () => { setupDatabricksEnv({ @@ -84,8 +106,10 @@ describe("Files Plugin Integration", () => { const appkit = await createApp({ plugins: [ + // port: 0 → OS assigns an ephemeral port. Avoids EADDRINUSE + // when concurrent CI runs or stale processes hold a fixed port. serverPlugin({ - port: TEST_PORT, + port: 0, host: "127.0.0.1", }), files(), @@ -93,7 +117,8 @@ describe("Files Plugin Integration", () => { }); server = appkit.server.getServer(); - baseUrl = `http://127.0.0.1:${TEST_PORT}`; + const port = await getListeningPort(server); + baseUrl = `http://127.0.0.1:${port}`; }); afterAll(async () => { @@ -437,15 +462,123 @@ describe("Files Plugin Integration", () => { }); describe("Service principal execution", () => { - test("requests without user token return 401 (policy requires user identity)", async () => { + test("header-less request + default publicRead() + list → 200 (policy decides)", async () => { + mockFilesApi.listDirectoryContents.mockReturnValue( + (async function* () { + yield { + name: "sp-file.txt", + path: "/Volumes/catalog/schema/vol/sp-file.txt", + is_directory: false, + }; + })(), + ); + // Use a unique path to avoid cached results from earlier tests const response = await fetch( `${baseUrl}/api/files/${VOL}/list?path=sp-only`, ); - expect(response.status).toBe(401); + expect(response.status).toBe(200); + }); + + test("header-less request + default publicRead() + upload → 403", async () => { + const response = await fetch( + `${baseUrl}/api/files/${VOL}/upload?path=/Volumes/catalog/schema/vol/sp-upload.bin`, + { + method: "POST", + headers: { "content-length": "0" }, + }, + ); + + expect(response.status).toBe(403); const data = (await response.json()) as { error: string }; - expect(data.error).toMatch(/x-forwarded-user/); + expect(data.error).toMatch(/Policy denied/); + }); + + test("header-less request + denyAll() volume → 403", async () => { + const denySpy = vi.fn().mockReturnValue(false); + const appkit = await createApp({ + plugins: [ + serverPlugin({ + port: 0, + host: "127.0.0.1", + }), + files({ + volumes: { + files: { policy: denySpy }, + }, + }), + ], + }); + + try { + const port = await getListeningPort(appkit.server.getServer()); + const localBase = `http://127.0.0.1:${port}`; + + const response = await fetch( + `${localBase}/api/files/${VOL}/list?path=denied`, + ); + + expect(response.status).toBe(403); + expect(denySpy).toHaveBeenCalled(); + const userArg = denySpy.mock.calls[0][2]; + expect(userArg.isServicePrincipal).toBe(true); + } finally { + const srv = appkit.server.getServer(); + if (srv) { + await new Promise((resolve, reject) => { + srv.close((err) => (err ? reject(err) : resolve())); + }); + } + } + }); + + test("header-less HTTP request → custom policy observes isServicePrincipal: true", async () => { + const policySpy = vi.fn().mockReturnValue(true); + const appkit = await createApp({ + plugins: [ + serverPlugin({ + port: 0, + host: "127.0.0.1", + }), + files({ + volumes: { + files: { policy: policySpy }, + }, + }), + ], + }); + + try { + const port = await getListeningPort(appkit.server.getServer()); + const localBase = `http://127.0.0.1:${port}`; + + mockFilesApi.listDirectoryContents.mockReturnValue( + (async function* () { + yield { + name: "spy-file.txt", + path: "/Volumes/catalog/schema/vol/spy-file.txt", + is_directory: false, + }; + })(), + ); + + const response = await fetch( + `${localBase}/api/files/${VOL}/list?path=spy`, + ); + + expect(response.status).toBe(200); + expect(policySpy).toHaveBeenCalledTimes(1); + const userArg = policySpy.mock.calls[0][2]; + expect(userArg.isServicePrincipal).toBe(true); + } finally { + const srv = appkit.server.getServer(); + if (srv) { + await new Promise((resolve, reject) => { + srv.close((err) => (err ? reject(err) : resolve())); + }); + } + } }); test("requests with user headers also succeed", async () => { diff --git a/packages/appkit/src/plugins/files/tests/plugin.test.ts b/packages/appkit/src/plugins/files/tests/plugin.test.ts index a4b9bea22..5bf7859ca 100644 --- a/packages/appkit/src/plugins/files/tests/plugin.test.ts +++ b/packages/appkit/src/plugins/files/tests/plugin.test.ts @@ -1,6 +1,8 @@ +import { PassThrough, Readable } from "node:stream"; import { mockServiceContext, setupDatabricksEnv } from "@tools/test-helpers"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { ServiceContext } from "../../../context/service-context"; +import { createApp } from "../../../core"; import { AuthenticationError } from "../../../errors"; import { ResourceType } from "../../../registry"; import { @@ -68,6 +70,7 @@ vi.mock("../../../context", async (importOriginal) => { vi.mock("../../../cache", () => ({ CacheManager: { getInstanceSync: vi.fn(() => mockCacheInstance), + getInstance: vi.fn(async () => mockCacheInstance), }, })); @@ -274,7 +277,6 @@ describe("FilesPlugin", () => { test("asUser without user header in production → throws AuthenticationError", () => { const originalEnv = process.env.NODE_ENV; process.env.NODE_ENV = "production"; - try { const plugin = new FilesPlugin(VOLUMES_CONFIG); const handle = plugin.exports()("uploads"); @@ -286,22 +288,18 @@ describe("FilesPlugin", () => { } }); - test("asUser in dev mode returns VolumeAPI with all 9 methods", () => { + test("asUser without user header in development → falls back to SP identity", () => { const originalEnv = process.env.NODE_ENV; process.env.NODE_ENV = "development"; - try { const plugin = new FilesPlugin(VOLUMES_CONFIG); const handle = plugin.exports()("uploads"); - const mockReq = { - header: (name: string) => - name === "x-forwarded-user" ? "test-user" : undefined, - } as any; - const api = handle.asUser(mockReq); + const mockReq = { header: () => undefined } as any; - for (const method of volumeMethods) { - expect(typeof (api as any)[method]).toBe("function"); - } + // Does not throw; returns a VolumeAPI that will run the policy with + // { isServicePrincipal: true } (matching the HTTP-path collapsed semantic). + const api = handle.asUser(mockReq); + expect(typeof api.list).toBe("function"); } finally { process.env.NODE_ENV = originalEnv; } @@ -988,19 +986,32 @@ describe("FilesPlugin", () => { delete process.env.DATABRICKS_VOLUME_WRITEONLY; }); - test("policy volume + no user header (production) → 401", async () => { - const originalEnv = process.env.NODE_ENV; - process.env.NODE_ENV = "production"; + test("header-less HTTP + default publicRead() + read action → 200 with SP user", async () => { + const policySpy = vi.fn().mockReturnValue(true); + const spyConfig = { + volumes: { + spied: { policy: policySpy }, + uploads: {}, + exports: {}, + }, + }; + process.env.DATABRICKS_VOLUME_SPIED = "/Volumes/c/s/spied"; + try { - const plugin = new FilesPlugin(POLICY_CONFIG); + const plugin = new FilesPlugin(spyConfig); const handler = getRouteHandler(plugin, "get", "/list"); const res = mockRes(); - // Override both headers to undefined so _extractUser has no user + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "h.txt", path: "/h.txt", is_directory: false }; + }, + ); + const noUserHeaders: Record = {}; await handler( { - params: { volumeKey: "public" }, + params: { volumeKey: "spied" }, query: {}, headers: noUserHeaders, header: (name: string) => noUserHeaders[name.toLowerCase()], @@ -1008,9 +1019,169 @@ describe("FilesPlugin", () => { res, ); - expect(res.status).toHaveBeenCalledWith(401); + const statusCodes = (res.status.mock.calls as number[][]).map( + (c) => c[0], + ); + expect(statusCodes).not.toContain(401); + expect(statusCodes).not.toContain(403); + expect(policySpy).toHaveBeenCalledWith( + "list", + expect.objectContaining({ volume: "spied" }), + expect.objectContaining({ + id: "test-service-principal", + isServicePrincipal: true, + }), + ); } finally { - process.env.NODE_ENV = originalEnv; + delete process.env.DATABRICKS_VOLUME_SPIED; + } + }); + + test("header-less HTTP + default publicRead() + write action → 403 with SP user", async () => { + const plugin = new FilesPlugin(POLICY_CONFIG); + const handler = getRouteHandler(plugin, "post", "/upload"); + const res = mockRes(); + + const noUserHeaders: Record = { + "content-length": "100", + }; + await handler( + { + params: { volumeKey: "uploads" }, + query: { path: "/test.bin" }, + headers: noUserHeaders, + header: (name: string) => noUserHeaders[name.toLowerCase()], + }, + res, + ); + + expect(res.status).toHaveBeenCalledWith(403); + expect(res.json).toHaveBeenCalledWith( + expect.objectContaining({ + error: expect.stringContaining("Policy denied"), + }), + ); + }); + + test("header-less HTTP + denyAll() → 403 with SP user observed by policy", async () => { + const policySpy = vi.fn(policy.denyAll()); + const spyConfig = { + volumes: { + denied: { policy: policySpy }, + uploads: {}, + exports: {}, + }, + }; + process.env.DATABRICKS_VOLUME_DENIED = "/Volumes/c/s/denied"; + + try { + const plugin = new FilesPlugin(spyConfig); + const handler = getRouteHandler(plugin, "get", "/list"); + const res = mockRes(); + + const noUserHeaders: Record = {}; + await handler( + { + params: { volumeKey: "denied" }, + query: {}, + headers: noUserHeaders, + header: (name: string) => noUserHeaders[name.toLowerCase()], + }, + res, + ); + + expect(res.status).toHaveBeenCalledWith(403); + expect(policySpy).toHaveBeenCalledWith( + "list", + expect.objectContaining({ volume: "denied" }), + expect.objectContaining({ isServicePrincipal: true }), + ); + } finally { + delete process.env.DATABRICKS_VOLUME_DENIED; + } + }); + + test("header-less HTTP request → policy spy observes { isServicePrincipal: true } and decision is honored", async () => { + const allowSpy = vi.fn().mockResolvedValue(true); + const allowConfig = { + volumes: { + gated: { policy: allowSpy }, + uploads: {}, + exports: {}, + }, + }; + process.env.DATABRICKS_VOLUME_GATED = "/Volumes/c/s/gated"; + + try { + const plugin = new FilesPlugin(allowConfig); + const handler = getRouteHandler(plugin, "get", "/list"); + const res = mockRes(); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "g.txt", path: "/g.txt", is_directory: false }; + }, + ); + + const noUserHeaders: Record = {}; + await handler( + { + params: { volumeKey: "gated" }, + query: {}, + headers: noUserHeaders, + header: (name: string) => noUserHeaders[name.toLowerCase()], + }, + res, + ); + + expect(allowSpy).toHaveBeenCalledTimes(1); + const userArg = allowSpy.mock.calls[0][2]; + expect(userArg.isServicePrincipal).toBe(true); + expect(userArg.id).toBe("test-service-principal"); + + const statusCodes = (res.status.mock.calls as number[][]).map( + (c) => c[0], + ); + expect(statusCodes).not.toContain(401); + expect(statusCodes).not.toContain(403); + } finally { + delete process.env.DATABRICKS_VOLUME_GATED; + } + }); + + test("header-less HTTP request + policy returns false → 403 (decision honored)", async () => { + const denySpy = vi.fn().mockResolvedValue(false); + const denyConfig = { + volumes: { + gated: { policy: denySpy }, + uploads: {}, + exports: {}, + }, + }; + process.env.DATABRICKS_VOLUME_GATED = "/Volumes/c/s/gated"; + + try { + const plugin = new FilesPlugin(denyConfig); + const handler = getRouteHandler(plugin, "get", "/list"); + const res = mockRes(); + + const noUserHeaders: Record = {}; + await handler( + { + params: { volumeKey: "gated" }, + query: {}, + headers: noUserHeaders, + header: (name: string) => noUserHeaders[name.toLowerCase()], + }, + res, + ); + + expect(denySpy).toHaveBeenCalledTimes(1); + const userArg = denySpy.mock.calls[0][2]; + expect(userArg.isServicePrincipal).toBe(true); + expect(res.status).toHaveBeenCalledWith(403); + } finally { + delete process.env.DATABRICKS_VOLUME_GATED; } }); @@ -1349,7 +1520,7 @@ describe("FilesPlugin", () => { } }); - test("asUser() call → policy receives user without isServicePrincipal", async () => { + test("asUser() call with full OBO headers → policy receives { id, isServicePrincipal: false }", async () => { const policySpy = vi.fn().mockReturnValue(true); const spyConfig = { volumes: { @@ -1378,9 +1549,13 @@ describe("FilesPlugin", () => { expect.objectContaining({ volume: "spied" }), expect.objectContaining({ id: "test-user" }), ); - // Should NOT have isServicePrincipal set + // `_extractUser` MUST mark a real-user identity explicitly so + // policies can reliably distinguish user vs SP. Returning + // `undefined` here would let `usersOnly`-style policies tied to + // `!user.isServicePrincipal` behave inconsistently (the legacy + // bug fixed by the asUser hardening). const userArg = policySpy.mock.calls[0][2]; - expect(userArg.isServicePrincipal).toBeUndefined(); + expect(userArg.isServicePrincipal).toBe(false); } finally { delete process.env.DATABRICKS_VOLUME_SPIED; } @@ -1673,6 +1848,1937 @@ describe("FilesPlugin", () => { }); }); + describe("_resolveAuth config inheritance", () => { + test("volume-level auth overrides plugin default", () => { + const plugin = new FilesPlugin({ + auth: "service-principal", + volumes: { + uploads: { auth: "on-behalf-of-user" }, + exports: {}, + }, + }); + expect((plugin as any)._resolveAuth("uploads")).toBe("on-behalf-of-user"); + }); + + test("volume without auth inherits plugin default", () => { + const plugin = new FilesPlugin({ + auth: "on-behalf-of-user", + volumes: { + uploads: {}, + exports: {}, + }, + }); + expect((plugin as any)._resolveAuth("exports")).toBe("on-behalf-of-user"); + }); + + test("neither volume nor plugin sets auth → defaults to service-principal", () => { + const plugin = new FilesPlugin({ + volumes: { + uploads: {}, + exports: {}, + }, + }); + expect((plugin as any)._resolveAuth("uploads")).toBe("service-principal"); + }); + + test("createApp round-trip preserves auth field through public factory", async () => { + // Satisfy the manifest's static `Files` resource requirement so + // ResourceRegistry.enforceValidation passes during createApp. + process.env.DATABRICKS_VOLUME_FILES = "/Volumes/catalog/schema/files"; + + // Capture the FilesPlugin instance constructed by createApp by spying on + // exports() (called when AppKit's plugin getter fires). This exercises + // the public construction path so any future runtime config validator + // (e.g. Zod) that drops the `auth` field would break this test. + let captured: FilesPlugin | undefined; + const exportsSpy = vi + .spyOn(FilesPlugin.prototype, "exports") + .mockImplementation(function (this: FilesPlugin) { + captured = this; + // Return a minimal stub; we only care about capturing `this`. + const stub = (() => { + throw new Error("not used in this test"); + }) as unknown as ReturnType; + (stub as any).volume = () => { + throw new Error("not used in this test"); + }; + return stub; + }); + + try { + const appkit = await createApp({ + plugins: [ + files({ + auth: "on-behalf-of-user", + volumes: { + uploads: { auth: "service-principal" }, + exports: {}, + }, + }), + ], + }); + + // Trigger the AppKit getter so wrapWithAsUser invokes exports() + // and our spy captures the FilesPlugin instance. + void (appkit as unknown as { files: unknown }).files; + + expect(captured).toBeInstanceOf(FilesPlugin); + // Volume override wins over plugin-level default. + expect((captured as any)._resolveAuth("uploads")).toBe( + "service-principal", + ); + // Volume with no explicit auth inherits the plugin default. + expect((captured as any)._resolveAuth("exports")).toBe( + "on-behalf-of-user", + ); + } finally { + exportsSpy.mockRestore(); + delete process.env.DATABRICKS_VOLUME_FILES; + } + }); + }); + + describe("OBO identity selection", () => { + function getRouteHandler( + plugin: FilesPlugin, + method: "get" | "post" | "delete", + pathSuffix: string, + ) { + const mockRouter = { + use: vi.fn(), + get: vi.fn(), + post: vi.fn(), + put: vi.fn(), + delete: vi.fn(), + patch: vi.fn(), + } as any; + plugin.injectRoutes(mockRouter); + const call = mockRouter[method].mock.calls.find( + (c: unknown[]) => + typeof c[0] === "string" && (c[0] as string).endsWith(pathSuffix), + ); + return call[call.length - 1] as (req: any, res: any) => Promise; + } + + function mockRes() { + const res: any = { headersSent: false }; + res.status = vi.fn().mockReturnValue(res); + res.json = vi.fn().mockReturnValue(res); + res.type = vi.fn().mockReturnValue(res); + res.send = vi.fn().mockReturnValue(res); + res.setHeader = vi.fn().mockReturnValue(res); + res.end = vi.fn(); + return res; + } + + function mockReq( + volumeKey: string, + headers: Record, + overrides: Record = {}, + ) { + return { + params: { volumeKey }, + query: {}, + ...overrides, + headers, + header: (name: string) => headers[name.toLowerCase()], + }; + } + + let originalNodeEnv: string | undefined; + + beforeEach(() => { + originalNodeEnv = process.env.NODE_ENV; + process.env.DATABRICKS_VOLUME_OBO_VOL = "/Volumes/c/s/obo"; + }); + + afterEach(() => { + if (originalNodeEnv === undefined) { + delete process.env.NODE_ENV; + } else { + process.env.NODE_ENV = originalNodeEnv; + } + delete process.env.DATABRICKS_VOLUME_OBO_VOL; + }); + + test("OBO volume + valid token → policy receives { isServicePrincipal: false, id: }", async () => { + const policySpy = vi.fn().mockReturnValue(true); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "get", "/list"); + const res = mockRes(); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "o.txt", path: "/o.txt", is_directory: false }; + }, + ); + + await handler( + mockReq("obo_vol", { + "x-forwarded-access-token": "test-token", + "x-forwarded-user": "alice@example.com", + }), + res, + ); + + expect(policySpy).toHaveBeenCalledTimes(1); + const userArg = policySpy.mock.calls[0][2]; + expect(userArg).toEqual({ + id: "alice@example.com", + isServicePrincipal: false, + }); + + const statusCodes = (res.status.mock.calls as number[][]).map( + (c) => c[0], + ); + expect(statusCodes).not.toContain(401); + expect(statusCodes).not.toContain(403); + }); + + test("OBO volume + missing token + NODE_ENV != 'development' → 401, no SDK call", async () => { + process.env.NODE_ENV = "production"; + const policySpy = vi.fn().mockReturnValue(true); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "get", "/list"); + const res = mockRes(); + + // No x-forwarded-access-token header. + await handler( + mockReq("obo_vol", { + "x-forwarded-user": "alice@example.com", + }), + res, + ); + + expect(res.status).toHaveBeenCalledWith(401); + const errBody = res.json.mock.calls[0][0]; + // Public body is generic — internal "missing x-forwarded-access-token" + // detail is server-side log only (CWE-209 hardening). + expect(errBody).toEqual({ + error: "Unauthorized", + plugin: "files", + }); + + // Policy must not have been evaluated and the SDK must not have been + // called. + expect(policySpy).not.toHaveBeenCalled(); + expect(mockClient.files.listDirectoryContents).not.toHaveBeenCalled(); + }); + + test("OBO volume + missing token + NODE_ENV === 'development' → exactly one warn, SP fallback proceeds", async () => { + process.env.NODE_ENV = "development"; + const policySpy = vi.fn().mockReturnValue(true); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "get", "/list"); + const res = mockRes(); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "d.txt", path: "/d.txt", is_directory: false }; + }, + ); + + // The plugin's logger.warn delegates to console.warn. Spy on console.warn + // and filter for the unique substring of our dev-fallback message. + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + try { + await handler( + mockReq("obo_vol", { + "x-forwarded-user": "alice@example.com", + }), + res, + ); + + const matchingWarns = warnSpy.mock.calls.filter((args) => + args.some( + (a) => + typeof a === "string" && + a.includes( + "OBO volume requested without x-forwarded-access-token", + ), + ), + ); + expect(matchingWarns).toHaveLength(1); + } finally { + warnSpy.mockRestore(); + } + + expect(policySpy).toHaveBeenCalledTimes(1); + const userArg = policySpy.mock.calls[0][2]; + expect(userArg).toEqual( + expect.objectContaining({ isServicePrincipal: true }), + ); + + const statusCodes = (res.status.mock.calls as number[][]).map( + (c) => c[0], + ); + expect(statusCodes).not.toContain(401); + expect(statusCodes).not.toContain(403); + }); + + test("OBO volume + valid token + policy denies → 403 PolicyDeniedError", async () => { + const policySpy = vi.fn().mockReturnValue(false); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "get", "/list"); + const res = mockRes(); + + await handler( + mockReq("obo_vol", { + "x-forwarded-access-token": "test-token", + "x-forwarded-user": "alice@example.com", + }), + res, + ); + + expect(policySpy).toHaveBeenCalledTimes(1); + const userArg = policySpy.mock.calls[0][2]; + expect(userArg).toEqual({ + id: "alice@example.com", + isServicePrincipal: false, + }); + + expect(res.status).toHaveBeenCalledWith(403); + expect(res.json).toHaveBeenCalledWith( + expect.objectContaining({ + error: expect.stringContaining("Policy denied"), + plugin: "files", + }), + ); + }); + }); + + describe("OBO read routes", () => { + function getRouteHandler( + plugin: FilesPlugin, + method: "get" | "post" | "delete", + pathSuffix: string, + ) { + const mockRouter = { + use: vi.fn(), + get: vi.fn(), + post: vi.fn(), + put: vi.fn(), + delete: vi.fn(), + patch: vi.fn(), + } as any; + plugin.injectRoutes(mockRouter); + const call = mockRouter[method].mock.calls.find( + (c: unknown[]) => + typeof c[0] === "string" && (c[0] as string).endsWith(pathSuffix), + ); + return call[call.length - 1] as (req: any, res: any) => Promise; + } + + function mockRes() { + // PassThrough gives us a real Writable so the streaming `/read` handler + // can `pipe(res)` without crashing. Express-style helpers (status, json, + // type, send, setHeader) are layered on top as spies. + const stream = new PassThrough(); + const res: any = stream; + res.headersSent = false; + res.status = vi.fn().mockReturnValue(res); + res.json = vi.fn().mockReturnValue(res); + res.type = vi.fn().mockReturnValue(res); + res.send = vi.fn().mockReturnValue(res); + res.setHeader = vi.fn().mockReturnValue(res); + res.destroy = vi.fn(); + return res; + } + + function mockReq( + volumeKey: string, + headers: Record, + overrides: Record = {}, + ) { + return { + params: { volumeKey }, + query: {}, + ...overrides, + headers, + header: (name: string) => headers[name.toLowerCase()], + }; + } + + /** + * Replace the default `getCurrentUserId` mock with one that delegates to + * the real implementation, so that calls inside `runInUserContext` resolve + * to the wrapped UserContext's `userId` (and the per-user cache key + * derived from it). + */ + async function useRealGetCurrentUserId() { + const actual = + await vi.importActual( + "../../../context", + ); + const ctx = await import("../../../context"); + vi.mocked(ctx.getCurrentUserId).mockImplementation( + actual.getCurrentUserId, + ); + } + + let originalNodeEnv: string | undefined; + + beforeEach(() => { + originalNodeEnv = process.env.NODE_ENV; + process.env.DATABRICKS_VOLUME_OBO_VOL = "/Volumes/c/s/obo"; + }); + + afterEach(() => { + if (originalNodeEnv === undefined) { + delete process.env.NODE_ENV; + } else { + process.env.NODE_ENV = originalNodeEnv; + } + delete process.env.DATABRICKS_VOLUME_OBO_VOL; + }); + + test("OBO list + valid token wraps SDK call in user context (alice's userId resolves inside the wrapped fn)", async () => { + await useRealGetCurrentUserId(); + const policySpy = vi.fn().mockReturnValue(true); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "get", "/list"); + const res = mockRes(); + + // Snapshot the user IDs observed inside each SDK invocation so we can + // assert that the SDK call ran inside `runInUserContext` with the + // expected user identity. + const observedUserIds: string[] = []; + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + // getCurrentUserId() inside the wrapped fn should resolve to alice. + const ctx = await import("../../../context"); + observedUserIds.push(ctx.getCurrentUserId()); + yield { name: "o.txt", path: "/o.txt", is_directory: false }; + }, + ); + + await handler( + mockReq("obo_vol", { + "x-forwarded-access-token": "test-token", + "x-forwarded-user": "alice@example.com", + }), + res, + ); + + expect(observedUserIds).toEqual(["alice@example.com"]); + + const statusCodes = (res.status.mock.calls as number[][]).map( + (c) => c[0], + ); + expect(statusCodes).not.toContain(401); + expect(statusCodes).not.toContain(403); + expect(statusCodes).not.toContain(500); + }); + + test("OBO read happy path: valid token + policy allows + UC allows → 200", async () => { + const policySpy = vi.fn().mockReturnValue(true); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "get", "/read"); + const res = mockRes(); + + // The connector reads via files.download — return a valid 200-ish + // response with content body. + mockClient.files.download.mockImplementation(async () => ({ + contents: new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode("hello")); + controller.close(); + }, + }), + })); + + await handler( + mockReq( + "obo_vol", + { + "x-forwarded-access-token": "test-token", + "x-forwarded-user": "alice@example.com", + }, + { query: { path: "hello.txt" } }, + ), + res, + ); + + // Both gates passed: policy was consulted with OBO identity, and the + // SDK's response was relayed back to the client. + expect(policySpy).toHaveBeenCalledTimes(1); + const userArg = policySpy.mock.calls[0][2]; + expect(userArg).toEqual({ + id: "alice@example.com", + isServicePrincipal: false, + }); + + // 2xx path: handler set Content-Type to text/plain and entered the + // streaming branch (no error status code was set). The body is piped + // through the PassThrough — exact byte-level assertion is covered by + // integration tests; here we just verify the handler took the success + // branch. + const statusCodes = (res.status.mock.calls as number[][]).map( + (c) => c[0], + ); + expect(statusCodes).not.toContain(401); + expect(statusCodes).not.toContain(403); + expect(statusCodes).not.toContain(500); + expect(res.type).toHaveBeenCalledWith("text/plain"); + }); + + test("OBO read cache is DISABLED: cross-user reads do not share cache state", async () => { + // Per Fix 3: the read cache is keyed by `getCurrentUserId()`, so user + // A's writes can only invalidate user A's cache entry. Cross-user + // staleness was the bug. The chosen mitigation (Option B) is to + // disable the read cache on OBO volumes entirely — this test pins + // that contract: OBO reads must NOT consult `getOrExecute`. The + // alternative (Option A: per-(volume, path) generation counters) + // would re-enable cache here. + await useRealGetCurrentUserId(); + const policySpy = vi.fn().mockReturnValue(true); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "get", "/list"); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + // Alice's request. + await handler( + mockReq("obo_vol", { + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + }), + mockRes(), + ); + + // Bob's request — same volume, same path. + await handler( + mockReq("obo_vol", { + "x-forwarded-access-token": "bob-token", + "x-forwarded-user": "bob@example.com", + }), + mockRes(), + ); + + // Cache is disabled on OBO: `getOrExecute` is bypassed. The SDK + // must execute on every request — no cross-user staleness possible. + expect(mockCacheInstance.getOrExecute).not.toHaveBeenCalled(); + expect(mockClient.files.listDirectoryContents).toHaveBeenCalledTimes(2); + }); + + test("SP volume reads still use the cache (cache is only disabled for OBO)", async () => { + await useRealGetCurrentUserId(); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { + auth: "on-behalf-of-user", + policy: policy.allowAll(), + }, + // SP-mode volume (default auth). + uploads: { policy: policy.allowAll() }, + exports: {}, + }, + }); + + const listHandler = getRouteHandler(plugin, "get", "/list"); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + // SP volume request — must consult the cache (cache enabled). + await listHandler( + mockReq("uploads", { + "x-forwarded-access-token": "sp-token-ignored", + "x-forwarded-user": "alice@example.com", + }), + mockRes(), + ); + + // OBO volume request — must skip the cache (cache disabled on OBO). + await listHandler( + mockReq("obo_vol", { + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + }), + mockRes(), + ); + + const calls = mockCacheInstance.getOrExecute.mock.calls; + // Exactly one cache consultation — the SP volume's. The OBO request + // bypassed the cache entirely. + expect(calls).toHaveLength(1); + const spCacheKey = calls[0][0]; + // The SP cache entry's array form is namespaced by volumeKey, so the + // OBO volume could not have collided even if its read had been + // cached. + expect(spCacheKey).toEqual( + expect.arrayContaining([expect.stringContaining("uploads")]), + ); + }); + + /** + * Fix 2 regression: every OBO HTTP request must build the UserContext + * AT MOST ONCE. The previous implementation invoked + * `_effectiveAuthMode` and then `_runWithAuth` separately — each + * called `_buildUserContextOrNull`, which calls + * `ServiceContext.createUserContext`, which constructs a fresh + * `WorkspaceClient`. Two builds per request was pure throwaway + * overhead. The single-pass `_resolveAuthForRequest` resolver collapses + * them. This test pins the contract: ONE `createUserContext` call per + * OBO HTTP request. + */ + test("OBO HTTP request builds the UserContext exactly once (no duplicate WorkspaceClient allocation)", async () => { + const policySpy = vi.fn().mockReturnValue(true); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "get", "/list"); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + // Reset before our request — `mockServiceContext` may have been + // consulted during plugin setup in earlier tests in the suite. + serviceContextMock.createUserContextSpy.mockClear(); + + await handler( + mockReq("obo_vol", { + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + }), + mockRes(), + ); + + // Exactly one. Two would mean the duplicate-allocation regression + // is back. + expect(serviceContextMock.createUserContextSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe("OBO write routes", () => { + function getRouteHandler( + plugin: FilesPlugin, + method: "get" | "post" | "delete", + pathSuffix: string, + ) { + const mockRouter = { + use: vi.fn(), + get: vi.fn(), + post: vi.fn(), + put: vi.fn(), + delete: vi.fn(), + patch: vi.fn(), + } as any; + plugin.injectRoutes(mockRouter); + const call = mockRouter[method].mock.calls.find( + (c: unknown[]) => + typeof c[0] === "string" && (c[0] as string).endsWith(pathSuffix), + ); + return call[call.length - 1] as (req: any, res: any) => Promise; + } + + function mockRes() { + const res: any = { headersSent: false }; + res.status = vi.fn().mockReturnValue(res); + res.json = vi.fn().mockReturnValue(res); + res.type = vi.fn().mockReturnValue(res); + res.send = vi.fn().mockReturnValue(res); + res.setHeader = vi.fn().mockReturnValue(res); + res.end = vi.fn(); + return res; + } + + /** + * `_handleUpload` calls `Readable.toWeb(req)` on the express request, so + * the mock req must be (or extend) a real Node Readable stream. Build one + * from the supplied body bytes and tack on the express-shaped helpers. + */ + function mockUploadReq( + volumeKey: string, + headers: Record, + body: string | Buffer = "", + overrides: Record = {}, + ) { + const buf = Buffer.isBuffer(body) ? body : Buffer.from(body); + const stream = Readable.from([buf]) as Readable & { + params?: any; + query?: any; + headers?: any; + header?: (name: string) => string | undefined; + }; + stream.params = { volumeKey }; + // Relative path so the connector's `resolvePath` joins it with the + // volume's defaultVolume from `DATABRICKS_VOLUME_OBO_VOL`. + stream.query = { path: "upload-target.bin" }; + stream.headers = headers; + stream.header = (name: string) => headers[name.toLowerCase()]; + Object.assign(stream, overrides); + return stream; + } + + function mockReq( + volumeKey: string, + headers: Record, + overrides: Record = {}, + ) { + return { + params: { volumeKey }, + query: {}, + body: {}, + ...overrides, + headers, + header: (name: string) => headers[name.toLowerCase()], + }; + } + + /** + * Replace the default `getCurrentUserId` mock with the real implementation + * so calls inside `runInUserContext` resolve to the wrapped UserContext's + * `userId` (mirrors the helper used by the `OBO read routes` block). + */ + async function useRealGetCurrentUserId() { + const actual = + await vi.importActual( + "../../../context", + ); + const ctx = await import("../../../context"); + vi.mocked(ctx.getCurrentUserId).mockImplementation( + actual.getCurrentUserId, + ); + } + + /** + * Replace the default `getWorkspaceClient` mock with the real + * implementation so that calls inside `runInUserContext` return the + * UserContext's `client` (the user-token-authenticated WorkspaceClient) + * while calls outside the user context fall back to the + * service-principal client from `ServiceContext.get()`. Required to + * exercise the real `_runWithAuth → runInUserContext → getWorkspaceClient + * → client.config.authenticate → fetch headers` chain. + */ + async function useRealGetWorkspaceClient() { + const actual = + await vi.importActual( + "../../../context", + ); + const ctx = await import("../../../context"); + vi.mocked(ctx.getWorkspaceClient).mockImplementation( + actual.getWorkspaceClient, + ); + } + + let originalNodeEnv: string | undefined; + + beforeEach(() => { + originalNodeEnv = process.env.NODE_ENV; + process.env.DATABRICKS_VOLUME_OBO_VOL = "/Volumes/c/s/obo"; + }); + + afterEach(() => { + if (originalNodeEnv === undefined) { + delete process.env.NODE_ENV; + } else { + process.env.NODE_ENV = originalNodeEnv; + } + delete process.env.DATABRICKS_VOLUME_OBO_VOL; + vi.unstubAllGlobals(); + }); + + /** + * NON-NEGOTIABLE upload-headers contract. + * + * `_handleUpload` does a hand-rolled `fetch PUT` (not a typed SDK call) + * via the connector's `upload()`. Inside `_runWithAuth` on an OBO volume, + * the chain is: + * + * getWorkspaceClient() → user-token WorkspaceClient + * client.config.authenticate(h) → injects "Bearer " + * fetch(url, { headers }) → outgoing request as the user + * + * This test pins that chain end-to-end. If any future SDK upgrade or + * refactor changes `client.config.authenticate`'s signature, removes the + * `runInUserContext` wrap from `_handleUpload`, or rewires + * `getWorkspaceClient()` so it returns the SP client inside the OBO + * scope, the user-token Authorization header will not reach `fetch` and + * this assertion fails. SP-token would silently leak to UC otherwise. + */ + test("OBO upload: outgoing fetch PUT carries user-token Authorization header (not SP)", async () => { + await useRealGetCurrentUserId(); + await useRealGetWorkspaceClient(); + + // SP-token marker — what the existing mockClient would inject if the + // OBO wrap leaked. We assert this NEVER reaches the outgoing fetch. + mockClient.config.authenticate.mockImplementation( + async (headers: Headers) => { + headers.set("Authorization", "Bearer SP-TOKEN"); + }, + ); + + // User-token marker — what the OBO scope MUST inject. + const userClient = { + config: { + host: "https://test.databricks.com", + authenticate: vi.fn(async (headers: Headers) => { + headers.set("Authorization", "Bearer USER-TOKEN-FOO"); + }), + }, + // `_handleUpload` only routes through the connector's `upload()`, + // which uses host + authenticate + fetch. No `files.*` accessor is + // touched on the user client during this path. + }; + + // Wire `_buildUserContextOrNull → ServiceContext.createUserContext` to + // return a UserContext whose `client` is our user-token client. This + // is the same hook `mockServiceContext` already installs; we just + // override its impl for this test. + serviceContextMock.createUserContextSpy.mockImplementation( + (_token: string, userId: string) => ({ + client: userClient as any, + userId, + warehouseId: serviceContextMock.serviceContext.warehouseId, + workspaceId: serviceContextMock.serviceContext.workspaceId, + isUserContext: true, + }), + ); + + // Capture the outgoing PUT. + const fetchSpy = vi + .fn() + .mockResolvedValue({ ok: true, status: 200, text: async () => "" }); + vi.stubGlobal("fetch", fetchSpy); + + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { + auth: "on-behalf-of-user", + policy: policy.allowAll(), + }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "post", "/upload"); + const res = mockRes(); + + await handler( + mockUploadReq( + "obo_vol", + { + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + "content-length": "5", + }, + "hello", + ), + res, + ); + + // The user-token authenticator was consulted exactly when upload ran. + expect(userClient.config.authenticate).toHaveBeenCalledTimes(1); + + // The hand-rolled fetch PUT happened exactly once. + expect(fetchSpy).toHaveBeenCalledTimes(1); + const fetchArgs = fetchSpy.mock.calls[0]; + const init = fetchArgs[1] as RequestInit & { headers: Headers }; + expect(init.method).toBe("PUT"); + + // The contract — proves the user-token Authorization header reached + // fetch. Toggling the `_runWithAuth` wrap off in `_handleUpload` + // breaks this assertion (fetch would carry "Bearer SP-TOKEN" instead). + expect(init.headers.get("Authorization")).toBe("Bearer USER-TOKEN-FOO"); + expect(init.headers.get("Authorization")).not.toBe("Bearer SP-TOKEN"); + + // Defense-in-depth: SP authenticator was NOT called along the OBO path. + expect(mockClient.config.authenticate).not.toHaveBeenCalled(); + }); + + test("OBO upload + missing token + NODE_ENV=production → 401 before any SDK or fetch call", async () => { + process.env.NODE_ENV = "production"; + + const fetchSpy = vi.fn(); + vi.stubGlobal("fetch", fetchSpy); + + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { + auth: "on-behalf-of-user", + policy: policy.allowAll(), + }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "post", "/upload"); + const res = mockRes(); + + // No x-forwarded-access-token header. + await handler( + mockUploadReq( + "obo_vol", + { + "x-forwarded-user": "alice@example.com", + "content-length": "5", + }, + "hello", + ), + res, + ); + + expect(res.status).toHaveBeenCalledWith(401); + const errBody = res.json.mock.calls[0][0]; + // Public body is generic — internal "missing x-forwarded-access-token" + // detail is server-side log only (CWE-209 hardening). + expect(errBody).toEqual({ + error: "Unauthorized", + plugin: "files", + }); + + // Neither the SDK upload nor the hand-rolled fetch ran. + expect(mockClient.files.upload).not.toHaveBeenCalled(); + expect(fetchSpy).not.toHaveBeenCalled(); + }); + + test("OBO mkdir + policy denies → 403 PolicyDeniedError; SDK not invoked", async () => { + const policySpy = vi.fn().mockReturnValue(false); + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policySpy }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "post", "/mkdir"); + const res = mockRes(); + + const fetchSpy = vi.fn(); + vi.stubGlobal("fetch", fetchSpy); + + await handler( + mockReq( + "obo_vol", + { + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + }, + { body: { path: "/new-dir" } }, + ), + res, + ); + + // Policy was consulted with the OBO identity. + expect(policySpy).toHaveBeenCalledTimes(1); + const userArg = policySpy.mock.calls[0][2]; + expect(userArg).toEqual({ + id: "alice@example.com", + isServicePrincipal: false, + }); + + // 403 PolicyDeniedError shape. + expect(res.status).toHaveBeenCalledWith(403); + expect(res.json).toHaveBeenCalledWith( + expect.objectContaining({ + error: expect.stringContaining("Policy denied"), + plugin: "files", + }), + ); + + // SDK + fetch not invoked. + expect(mockClient.files.createDirectory).not.toHaveBeenCalled(); + expect(fetchSpy).not.toHaveBeenCalled(); + }); + + test("OBO delete + valid token + UC denies → user-token client invoked, error propagated", async () => { + await useRealGetCurrentUserId(); + await useRealGetWorkspaceClient(); + + // Distinct user-token client with a `files.delete` that mimics a UC + // failure (e.g. 403 from UC because the user lacks privilege). + const userClient = { + config: { + host: "https://test.databricks.com", + authenticate: vi.fn(async (h: Headers) => { + h.set("Authorization", "Bearer USER-TOKEN-DEL"); + }), + }, + files: { + delete: vi.fn(async () => { + throw new MockApiError("UC denied", 403); + }), + }, + }; + + serviceContextMock.createUserContextSpy.mockImplementation( + (_token: string, userId: string) => ({ + client: userClient as any, + userId, + warehouseId: serviceContextMock.serviceContext.warehouseId, + workspaceId: serviceContextMock.serviceContext.workspaceId, + isUserContext: true, + }), + ); + + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { + auth: "on-behalf-of-user", + policy: policy.allowAll(), + }, + uploads: {}, + exports: {}, + }, + }); + const handler = getRouteHandler(plugin, "delete", "/:volumeKey"); + const res = mockRes(); + + await handler( + mockReq( + "obo_vol", + { + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + }, + { query: { path: "doomed.txt" } }, + ), + res, + ); + + // The user-token client was used, not the SP one. + expect(userClient.files.delete).toHaveBeenCalledTimes(1); + expect(mockClient.files.delete).not.toHaveBeenCalled(); + + // The UC error surfaced as 403 (the ApiError statusCode). + expect(res.status).toHaveBeenCalledWith(403); + expect(res.json).toHaveBeenCalledWith( + expect.objectContaining({ plugin: "files" }), + ); + }); + + /** + * Fix 3 regression: SP-volume write to a nested path must invalidate + * the parent directory's list cache, not the file path's. The previous + * code passed `path` (file path) directly to the cache key as the + * "path segment" — so `_handleList` (which keys by directory) and + * `_invalidateListCache` (which keyed by file) used different segments. + */ + test("SP write of /Volumes/.../foo/bar.txt invalidates the parent /foo list cache key (not the file path)", async () => { + // SP volume — uses the cache. + process.env.DATABRICKS_VOLUME_SP_VOL = "/Volumes/c/s/sp"; + try { + const plugin = new FilesPlugin({ + volumes: { + sp_vol: { policy: policy.allowAll() }, + uploads: {}, + exports: {}, + }, + }); + const mkdirHandler = getRouteHandler(plugin, "post", "/mkdir"); + + mockClient.files.createDirectory.mockResolvedValue(undefined); + + // Track which (parts, userKey) pairs go through generateKey so we + // can match the invalidation segment exactly. + const generateKeyCalls: Array<{ + parts: (string | number | object)[]; + userKey: string; + }> = []; + mockCacheInstance.generateKey.mockImplementation( + (parts: (string | number | object)[], userKey: string) => { + generateKeyCalls.push({ parts, userKey }); + return "stub-key"; + }, + ); + + await mkdirHandler( + mockReq("sp_vol", {}, { body: { path: "/Volumes/c/s/sp/foo/bar" } }), + mockRes(), + ); + + // Exactly one list-cache invalidation key was constructed. + const listInvalidations = generateKeyCalls.filter( + (c) => Array.isArray(c.parts) && c.parts[0] === "files:sp_vol:list", + ); + expect(listInvalidations).toHaveLength(1); + + // The path-segment is the PARENT directory (resolved), not the + // written path. `parentDirectory("/Volumes/c/s/sp/foo/bar")` + // returns `"/Volumes/c/s/sp/foo"`, which the connector resolves + // unchanged because it's already absolute and starts with /Volumes/. + expect(listInvalidations[0].parts[1]).toBe("/Volumes/c/s/sp/foo"); + // Defense-in-depth: the file path itself must NOT appear as the + // segment. + expect(listInvalidations[0].parts[1]).not.toBe( + "/Volumes/c/s/sp/foo/bar", + ); + } finally { + delete process.env.DATABRICKS_VOLUME_SP_VOL; + } + }); + + /** + * Fix 3 regression: SP root-level write (e.g. mkdir of a relative path + * like "newdir") must invalidate the `"__root__"` sentinel that + * `_handleList` uses for rootless listings. + */ + test("SP write of root-level path uses the __root__ sentinel for invalidation (matches _handleList's rootless cache key)", async () => { + const plugin = new FilesPlugin({ + volumes: { + uploads: { policy: policy.allowAll() }, + exports: {}, + }, + }); + const mkdirHandler = getRouteHandler(plugin, "post", "/mkdir"); + + mockClient.files.createDirectory.mockResolvedValue(undefined); + + const generateKeyCalls: Array<{ + parts: (string | number | object)[]; + userKey: string; + }> = []; + mockCacheInstance.generateKey.mockImplementation( + (parts: (string | number | object)[], userKey: string) => { + generateKeyCalls.push({ parts, userKey }); + return "stub-key"; + }, + ); + + // Relative path "newdir" → parentDirectory returns "" → root-level + // → must use the "__root__" sentinel. + await mkdirHandler( + mockReq("uploads", {}, { body: { path: "newdir" } }), + mockRes(), + ); + + const listInvalidations = generateKeyCalls.filter( + (c) => Array.isArray(c.parts) && c.parts[0] === "files:uploads:list", + ); + expect(listInvalidations).toHaveLength(1); + expect(listInvalidations[0].parts[1]).toBe("__root__"); + }); + + /** + * Fix A regression: SP-volume writes must AWAIT the underlying + * `cache.delete()` BEFORE sending the HTTP success response. Otherwise + * a client that fires a follow-up `GET /list` in the same tick can race + * the still-pending invalidation and observe pre-write data. + * + * Structural assertion (timing-independent, bug-sensitive): we install + * a `cache.delete` implementation backed by a deferred promise we + * control, then dispatch the `mkdir` handler WITHOUT awaiting it. + * Once `cache.delete` is observed to have been called, we drain a + * generous number of microtasks AND macrotasks while the deferred is + * still pending. The contract is that `res.json` MUST NOT fire until + * we resolve the deferred — proving the response is gated on the + * invalidation having actually completed (not just been issued). + * + * If anyone reverts to `this._invalidateListCache(...)` without + * `await`, `cache.delete()` is left dangling but `res.json` proceeds + * synchronously after it — making the "after delete called, before + * delete resolved" window observable, and the assertion below would + * fail. + */ + test("SP write awaits cache.delete BEFORE sending the response (no write→read race)", async () => { + // Restore the default (mocked) `getWorkspaceClient` and + // `getCurrentUserId`, since earlier tests in this block install the + // REAL implementations and `vi.clearAllMocks` does NOT reset + // implementations. + const ctx = await import("../../../context"); + vi.mocked(ctx.getWorkspaceClient).mockImplementation( + () => mockClient as any, + ); + vi.mocked(ctx.getCurrentUserId).mockImplementation( + () => "test-service-principal", + ); + + process.env.DATABRICKS_VOLUME_SP_VOL = "/Volumes/c/s/sp"; + try { + const plugin = new FilesPlugin({ + volumes: { + sp_vol: { policy: policy.allowAll() }, + uploads: {}, + exports: {}, + }, + }); + const mkdirHandler = getRouteHandler(plugin, "post", "/mkdir"); + + mockClient.files.createDirectory.mockResolvedValue(undefined); + mockCacheInstance.generateKey.mockReturnValue("stub-key"); + + // Deferred promise that gates the cache delete. The handler must + // await this before writing the success response. + let releaseDelete!: () => void; + const deletePending = new Promise((resolve) => { + releaseDelete = resolve; + }); + mockCacheInstance.delete.mockImplementation( + async () => await deletePending, + ); + + const res = mockRes(); + + // Fire the write WITHOUT awaiting — we want to inspect state while + // the handler is parked inside `_invalidateListCache`. + const handlerDone = mkdirHandler( + mockReq("sp_vol", {}, { body: { path: "/Volumes/c/s/sp/foo/bar" } }), + res, + ); + + // Wait until `cache.delete` is invoked (ie the SDK call has + // resolved and the handler has reached the invalidation step). + // Polling the mock's call count avoids any fixed-microtask budget + // assumptions about how many awaits the interceptor stack adds. + // Use setImmediate to also drain macrotask queue items (telemetry/ + // timeout interceptors may use setTimeout under the hood). + const deadline = Date.now() + 1000; + while ( + mockCacheInstance.delete.mock.calls.length === 0 && + Date.now() < deadline + ) { + await new Promise((resolve) => setImmediate(resolve)); + } + + expect(mockClient.files.createDirectory).toHaveBeenCalledTimes(1); + expect(mockCacheInstance.delete).toHaveBeenCalledTimes(1); + + // Critical assertion: drain plenty of microtasks AND macrotasks + // while `cache.delete` is still parked on the deferred. If the + // handler was NOT awaiting the invalidation, `res.json` would + // already have been called by now (the missing-await bug). With + // the fix, the response is gated on `releaseDelete()` below. + for (let i = 0; i < 50; i++) { + await new Promise((resolve) => setImmediate(resolve)); + } + expect(res.json).not.toHaveBeenCalled(); + + // Release the cache delete; the handler now finishes and responds. + releaseDelete(); + await handlerDone; + + expect(res.json).toHaveBeenCalledTimes(1); + } finally { + delete process.env.DATABRICKS_VOLUME_SP_VOL; + } + }); + + /** + * Fix 3 regression: cross-user OBO read freshness. The OBO read cache + * is keyed by `getCurrentUserId()`, so user A's writes can only + * invalidate user A's cache entry. With cache disabled on OBO, user B + * must see fresh data after user A writes. + */ + test("OBO write by user A → user B's next read sees fresh data (cross-user freshness; cache disabled on OBO)", async () => { + // This test relies on the DEFAULT mocked `getWorkspaceClient` and + // `getCurrentUserId` (always returning the SP fixture's + // `mockClient`). Earlier tests in this block install the REAL impls + // via `useRealGetWorkspaceClient`/`useRealGetCurrentUserId`, and + // Vitest's `vi.clearAllMocks` between tests does NOT reset + // implementations — so we restore the defaults explicitly. + const ctx = await import("../../../context"); + vi.mocked(ctx.getWorkspaceClient).mockImplementation( + () => mockClient as any, + ); + vi.mocked(ctx.getCurrentUserId).mockImplementation( + () => "test-service-principal", + ); + + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { + auth: "on-behalf-of-user", + policy: policy.allowAll(), + }, + uploads: {}, + exports: {}, + }, + }); + const listHandler = getRouteHandler(plugin, "get", "/list"); + const uploadHandler = getRouteHandler(plugin, "post", "/upload"); + + // The connector's `list()` aggregates async-iterable yields into an + // array. Initial listings return [], the post-upload listing returns + // [{...}]. We toggle the return AFTER alice's upload has reached + // the SDK to simulate cross-user freshness. + let postUploadVisible = false; + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + if (postUploadVisible) { + yield { + name: "fresh.txt", + path: "/fresh.txt", + is_directory: false, + }; + } + }, + ); + mockClient.config.authenticate.mockImplementation(async (h: Headers) => { + h.set("Authorization", "Bearer ALICE"); + }); + const fetchSpy = vi + .fn() + .mockResolvedValue({ ok: true, status: 200, text: async () => "" }); + vi.stubGlobal("fetch", fetchSpy); + + // Bob's first list — empty. + const bobRes1 = mockRes(); + await listHandler( + mockReq("obo_vol", { + "x-forwarded-access-token": "bob-token", + "x-forwarded-user": "bob@example.com", + }), + bobRes1, + ); + + // Alice uploads. + postUploadVisible = true; + const aliceRes = mockRes(); + await uploadHandler( + mockUploadReq( + "obo_vol", + { + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + "content-length": "5", + }, + "hello", + ), + aliceRes, + ); + + // Bob's second list — must see the fresh file because the OBO read + // cache is disabled (cross-user freshness guard). + const bobRes2 = mockRes(); + await listHandler( + mockReq("obo_vol", { + "x-forwarded-access-token": "bob-token", + "x-forwarded-user": "bob@example.com", + }), + bobRes2, + ); + + // Bob's first response was empty, second has the fresh entry. + const bobJson1 = bobRes1.json.mock.calls[0]?.[0] ?? []; + const bobJson2 = bobRes2.json.mock.calls[0]?.[0] ?? []; + expect(Array.isArray(bobJson1) ? bobJson1.length : 0).toBe(0); + expect(Array.isArray(bobJson2) ? bobJson2.length : 0).toBeGreaterThan(0); + }); + }); + + describe("VolumeHandle.asUser SDK identity", () => { + /** + * Replace the default `getWorkspaceClient` mock with the real + * implementation so that calls inside `runInUserContext` return the + * UserContext's `client` (the user-token-authenticated WorkspaceClient) + * while calls outside the user context fall back to the + * service-principal client from `ServiceContext.get()`. + */ + async function useRealGetWorkspaceClient() { + const actual = + await vi.importActual( + "../../../context", + ); + const ctx = await import("../../../context"); + vi.mocked(ctx.getWorkspaceClient).mockImplementation( + actual.getWorkspaceClient, + ); + } + + /** + * Build a minimal mock express.Request with the supplied headers. Only + * `header(name)` is consulted by `VolumeHandle.asUser`. + */ + function mockReq(headers: Record) { + return { + header: (name: string) => headers[name.toLowerCase()], + } as any; + } + + let originalNodeEnv: string | undefined; + + beforeEach(async () => { + originalNodeEnv = process.env.NODE_ENV; + // Restore the default `getWorkspaceClient(() => mockClient)` mock in + // case a previous test in this block called `useRealGetWorkspaceClient` + // — Vitest's `vi.clearAllMocks` clears call history but not impls. + const ctx = await import("../../../context"); + vi.mocked(ctx.getWorkspaceClient).mockImplementation( + () => mockClient as any, + ); + }); + + afterEach(() => { + if (originalNodeEnv === undefined) { + delete process.env.NODE_ENV; + } else { + process.env.NODE_ENV = originalNodeEnv; + } + }); + + test("asUser on SP-configured volume → SDK call goes through user-token client (hard override)", async () => { + // Use the real ALS-driven `getWorkspaceClient` so that calls inside + // `runInUserContext` resolve to the wrapped UserContext's client and + // calls outside fall through to ServiceContext.client (which lacks + // `.files` — making any leak through the SP path crash loudly). + await useRealGetWorkspaceClient(); + + // Distinct user-token client whose `listDirectoryContents` is the + // one we expect `asUser` to route through. + const userListSpy = vi.fn(async function* () { + yield { name: "user.txt", path: "/user.txt", is_directory: false }; + }); + const userClient = { + config: { + host: "https://test.databricks.com", + authenticate: vi.fn(), + }, + files: { listDirectoryContents: userListSpy }, + }; + + // Wire `_buildUserContextOrNull → ServiceContext.createUserContext` to + // return a UserContext whose `client` is our user-token client. + serviceContextMock.createUserContextSpy.mockImplementation( + (_token: string, userId: string) => ({ + client: userClient as any, + userId, + warehouseId: serviceContextMock.serviceContext.warehouseId, + workspaceId: serviceContextMock.serviceContext.workspaceId, + isUserContext: true, + }), + ); + + // Set the SP volume's default path BEFORE plugin construction so the + // connector picks it up. + process.env.DATABRICKS_VOLUME_SP_VOL = "/Volumes/c/s/sp"; + try { + const plugin = new FilesPlugin({ + volumes: { + // SP-configured volume (the auth: "service-principal" default). + sp_vol: { auth: "service-principal", policy: policy.allowAll() }, + uploads: {}, + exports: {}, + }, + }); + + const handle = plugin.exports()("sp_vol"); + const userApi = handle.asUser( + mockReq({ + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + }), + ); + + // Materialize the async iterator — the connector iterates the SDK + // generator and aggregates results. + await userApi.list("subdir"); + + // The user-token client served the SDK call (proof the + // `runInUserContext` wrap took effect). + expect(userListSpy).toHaveBeenCalledTimes(1); + // Defense-in-depth: a UserContext was constructed for the request. + expect(serviceContextMock.createUserContextSpy).toHaveBeenCalledTimes( + 1, + ); + } finally { + delete process.env.DATABRICKS_VOLUME_SP_VOL; + } + }); + + test("programmatic call on OBO volume without asUser → no runInUserContext wrap; SDK runs through default getWorkspaceClient (SP at top level)", async () => { + // Discovery note: programmatic calls (no `req`) cannot synthesize a + // user identity. The OBO-volume "execute as user" behavior is wired + // through the HTTP route layer's `_runWithAuth`, which builds a + // UserContext from the request headers. There is no equivalent for + // the programmatic path — `appKit.files("obo-vol").list()` (no + // `asUser`) executes against whatever `getWorkspaceClient()` returns + // in the current ALS scope, which at the top level is the SP client. + // This test pins that reality. The task spec's framing of "OBO + // default executes as user via volume default" is HTTP-only. + + // We rely on the default mocked `getWorkspaceClient(() => mockClient)` + // so the SP client returned is the file-aware fixture. + const spListSpy = vi.fn(async function* () { + yield { name: "sp.txt", path: "/sp.txt", is_directory: false }; + }); + mockClient.files.listDirectoryContents.mockImplementation(spListSpy); + + // Spy on createUserContext to confirm no wrap happened. + serviceContextMock.createUserContextSpy.mockClear(); + + process.env.DATABRICKS_VOLUME_OBO_VOL = "/Volumes/c/s/obo"; + try { + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { + auth: "on-behalf-of-user", + policy: policy.allowAll(), + }, + uploads: {}, + exports: {}, + }, + }); + + const handle = plugin.exports()("obo_vol"); + await handle.list("subdir"); // no asUser; pure programmatic call + + // The default `mockClient` (SP fixture) served the call. No + // user-context wrap exists at the top of the call stack — the OBO + // semantic only applies on the HTTP route path. + expect(spListSpy).toHaveBeenCalledTimes(1); + expect(serviceContextMock.createUserContextSpy).not.toHaveBeenCalled(); + } finally { + delete process.env.DATABRICKS_VOLUME_OBO_VOL; + } + }); + + test("programmatic call on SP volume without asUser → SDK runs as SP; runInUserContext is not invoked (status-quo guard)", async () => { + // Default mocked `getWorkspaceClient(() => mockClient)`; no + // ALS-driven dispatch. + const spListSpy = vi.fn(async function* () { + yield { name: "sp.txt", path: "/sp.txt", is_directory: false }; + }); + mockClient.files.listDirectoryContents.mockImplementation(spListSpy); + + serviceContextMock.createUserContextSpy.mockClear(); + + const plugin = new FilesPlugin(VOLUMES_CONFIG); + const handle = plugin.exports()("uploads"); + await handle.list("subdir"); // no asUser; pure SP path + + // The SP client served the SDK call. + expect(spListSpy).toHaveBeenCalledTimes(1); + // No UserContext was ever built — the SP default path shouldn't go + // anywhere near `createUserContext`. + expect(serviceContextMock.createUserContextSpy).not.toHaveBeenCalled(); + }); + + /** + * Fix 1 regression: in production, `asUser(req)` MUST require both + * `x-forwarded-user` and `x-forwarded-access-token`. The previous + * implementation only required the user header. With user-only: + * - `_extractUser` returned `{ id: "alice" }` (no isServicePrincipal) + * - `_buildUserContextOrNull` returned `null` (token missing) + * - `asUser` returned the SP-wrapped API (no runInUserContext) + * Net effect: policy saw alice as a real user, SDK ran with SP creds. + * This test pins the new strict-token contract. + */ + test("asUser in production with x-forwarded-user but no x-forwarded-access-token throws AuthenticationError.missingToken (privilege-confusion guard)", () => { + process.env.NODE_ENV = "production"; + const plugin = new FilesPlugin(VOLUMES_CONFIG); + const handle = plugin.exports()("uploads"); + const reqWithUserOnly = { + header: (name: string) => + ({ "x-forwarded-user": "alice@example.com" })[name.toLowerCase()], + } as any; + + expect(() => handle.asUser(reqWithUserOnly)).toThrow(AuthenticationError); + try { + handle.asUser(reqWithUserOnly); + } catch (err) { + expect(err).toBeInstanceOf(AuthenticationError); + expect((err as Error).message).toMatch(/x-forwarded-access-token/); + } + }); + + /** + * Fix 1 regression — dev mode: with only `x-forwarded-user` and no + * `x-forwarded-access-token`, asUser must NOT silently expose the SP + * client to the user identity. The dev-fallback returns a policy user + * marked `isServicePrincipal: true` so a `usersOnly` policy that gates + * on `!user.isServicePrincipal` still fails closed (just like + * production does, with a 401 there). + */ + test("asUser in development with x-forwarded-user but no x-forwarded-access-token returns SP-marked policy user; SDK runs as SP", async () => { + process.env.NODE_ENV = "development"; + const policySpy = vi.fn().mockReturnValue(true); + + // Use the default mocked SP client; spy on its listDirectoryContents + // to confirm the SDK call resolved against the SP path. + const spListSpy = vi.fn(async function* () { + yield { name: "sp.txt", path: "/sp.txt", is_directory: false }; + }); + mockClient.files.listDirectoryContents.mockImplementation(spListSpy); + + serviceContextMock.createUserContextSpy.mockClear(); + + const plugin = new FilesPlugin({ + volumes: { + uploads: { policy: policySpy }, + exports: {}, + }, + }); + const handle = plugin.exports()("uploads"); + + const reqWithUserOnly = { + header: (name: string) => + ({ "x-forwarded-user": "alice@example.com" })[name.toLowerCase()], + } as any; + + const userApi = handle.asUser(reqWithUserOnly); + await userApi.list(); + + // Policy received an explicitly SP-marked identity — a usersOnly + // policy gating on !isServicePrincipal would correctly fail closed. + expect(policySpy).toHaveBeenCalledTimes(1); + const userArg = policySpy.mock.calls[0][2]; + expect(userArg.isServicePrincipal).toBe(true); + + // SDK ran via the SP client (no UserContext was built — the + // dev-fallback skips runInUserContext). + expect(spListSpy).toHaveBeenCalledTimes(1); + expect(serviceContextMock.createUserContextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("files.auth_mode telemetry attribute", () => { + /** + * Spy on the plugin's telemetry provider AND every per-volume + * `FilesConnector.telemetry` provider so we can inspect every + * `startActiveSpan` invocation. + * + * The plugin's spans come from the `TelemetryInterceptor` (HTTP route + * path). Programmatic calls go through the connector's `traced()` + * decorator, which creates `files.` spans on the connector's own + * provider — and per Fix 4 the auth-mode attribute is now propagated + * onto THAT span via AsyncLocalStorage (no duplicate parent span on + * the plugin's provider). The test asserts on the merged call list. + */ + function spyOnTelemetry(plugin: FilesPlugin) { + const calls: Array<{ + name: string; + attributes: Record | undefined; + }> = []; + + const wire = (telemetry: { + startActiveSpan: (...args: unknown[]) => Promise; + }) => { + const original = telemetry.startActiveSpan.bind(telemetry); + vi.spyOn(telemetry, "startActiveSpan").mockImplementation( + (...args: unknown[]) => { + const [name, options] = args as [ + string, + { attributes?: Record } | undefined, + ]; + calls.push({ name, attributes: options?.attributes }); + return original(...args); + }, + ); + }; + + wire((plugin as any).telemetry); + const connectors = (plugin as any).volumeConnectors as Record< + string, + { telemetry: any } + >; + for (const conn of Object.values(connectors)) { + wire(conn.telemetry); + } + + return calls; + } + + function getRouteHandler( + plugin: FilesPlugin, + method: "get" | "post" | "delete", + pathSuffix: string, + ) { + const mockRouter = { + use: vi.fn(), + get: vi.fn(), + post: vi.fn(), + put: vi.fn(), + delete: vi.fn(), + patch: vi.fn(), + } as any; + plugin.injectRoutes(mockRouter); + const call = mockRouter[method].mock.calls.find( + (c: unknown[]) => + typeof c[0] === "string" && (c[0] as string).endsWith(pathSuffix), + ); + return call[call.length - 1] as (req: any, res: any) => Promise; + } + + function mockRes() { + const res: any = { headersSent: false }; + res.status = vi.fn().mockReturnValue(res); + res.json = vi.fn().mockReturnValue(res); + res.type = vi.fn().mockReturnValue(res); + res.send = vi.fn().mockReturnValue(res); + res.setHeader = vi.fn().mockReturnValue(res); + res.end = vi.fn(); + return res; + } + + function mockReq( + volumeKey: string, + headers: Record, + overrides: Record = {}, + ) { + return { + params: { volumeKey }, + query: {}, + ...overrides, + headers, + header: (name: string) => headers[name.toLowerCase()], + }; + } + + let originalNodeEnv: string | undefined; + + beforeEach(() => { + originalNodeEnv = process.env.NODE_ENV; + process.env.DATABRICKS_VOLUME_OBO_VOL = "/Volumes/c/s/obo"; + }); + + afterEach(() => { + if (originalNodeEnv === undefined) { + delete process.env.NODE_ENV; + } else { + process.env.NODE_ENV = originalNodeEnv; + } + delete process.env.DATABRICKS_VOLUME_OBO_VOL; + }); + + test("OBO volume + HTTP route + valid token → span attribute is 'on-behalf-of-user'", async () => { + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policy.allowAll() }, + uploads: {}, + exports: {}, + }, + }); + const calls = spyOnTelemetry(plugin); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + const handler = getRouteHandler(plugin, "get", "/list"); + await handler( + mockReq("obo_vol", { + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + }), + mockRes(), + ); + + // The span created by TelemetryInterceptor must carry the OBO marker. + const obo = calls.find( + (c) => c.attributes?.["files.auth_mode"] === "on-behalf-of-user", + ); + expect(obo).toBeDefined(); + }); + + test("SP volume + HTTP route → span attribute is 'service-principal'", async () => { + const plugin = new FilesPlugin(VOLUMES_CONFIG); + const calls = spyOnTelemetry(plugin); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + const handler = getRouteHandler(plugin, "get", "/list"); + await handler(mockReq("uploads", {}), mockRes()); + + // No OBO span attribute should appear on the SP route path. + const oboCall = calls.find( + (c) => c.attributes?.["files.auth_mode"] === "on-behalf-of-user", + ); + expect(oboCall).toBeUndefined(); + + // At least one span must explicitly tag service-principal. + const sp = calls.find( + (c) => c.attributes?.["files.auth_mode"] === "service-principal", + ); + expect(sp).toBeDefined(); + }); + + test("appKit.files('sp-vol').asUser(req).list() programmatic → span attribute is 'on-behalf-of-user' (asUser forces it on SP volumes)", async () => { + const plugin = new FilesPlugin(VOLUMES_CONFIG); + const calls = spyOnTelemetry(plugin); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + const handle = plugin.exports()("uploads"); // SP-mode volume + const userApi = handle.asUser({ + header: (name: string) => + ({ + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + })[name.toLowerCase()], + } as any); + await userApi.list("subdir"); + + // asUser hard-overrides SP into OBO, so the programmatic span must + // carry the OBO marker. + const obo = calls.find( + (c) => c.attributes?.["files.auth_mode"] === "on-behalf-of-user", + ); + expect(obo).toBeDefined(); + expect(obo?.name).toBe("files.list"); + }); + + /** + * Fix 4 regression: programmatic calls produce exactly ONE + * `files.` span (the connector's), not two. The previous + * `_withAuthModeSpan` helper opened a duplicate parent span with the + * same name, doubling allocation/export. The current implementation + * propagates `files.auth_mode` onto the connector's existing span via + * AsyncLocalStorage. This test pins span count == 1. + */ + test("programmatic asUser().list() produces exactly ONE files.list span (no duplicate parent span)", async () => { + const plugin = new FilesPlugin(VOLUMES_CONFIG); + const calls = spyOnTelemetry(plugin); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + const handle = plugin.exports()("uploads"); + const userApi = handle.asUser({ + header: (name: string) => + ({ + "x-forwarded-access-token": "alice-token", + "x-forwarded-user": "alice@example.com", + })[name.toLowerCase()], + } as any); + await userApi.list("subdir"); + + const filesListSpans = calls.filter((c) => c.name === "files.list"); + // Pre-fix: 2 (outer auth-mode span + inner connector span). Now: 1. + expect(filesListSpans).toHaveLength(1); + // The single span must carry the auth_mode attribute. + expect(filesListSpans[0].attributes?.["files.auth_mode"]).toBe( + "on-behalf-of-user", + ); + }); + + test("programmatic SP-volume .list() produces exactly ONE files.list span tagged service-principal", async () => { + const plugin = new FilesPlugin(VOLUMES_CONFIG); + const calls = spyOnTelemetry(plugin); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + const handle = plugin.exports()("uploads"); + await handle.list("subdir"); // no asUser → SP path + + const filesListSpans = calls.filter((c) => c.name === "files.list"); + expect(filesListSpans).toHaveLength(1); + expect(filesListSpans[0].attributes?.["files.auth_mode"]).toBe( + "service-principal", + ); + }); + + test("OBO volume + HTTP route + dev fallback (no token) → span attribute is 'service-principal'", async () => { + // Dev-fallback path: OBO volume, but no x-forwarded-access-token, so + // `_buildUserContextOrNull` returns null and `_runWithAuth` falls + // through to direct SP execution. The span must reflect what + // operationally happened (SP), not what the volume is configured + // for (OBO). + process.env.NODE_ENV = "development"; + + const plugin = new FilesPlugin({ + volumes: { + obo_vol: { auth: "on-behalf-of-user", policy: policy.allowAll() }, + uploads: {}, + exports: {}, + }, + }); + const calls = spyOnTelemetry(plugin); + + mockClient.files.listDirectoryContents.mockImplementation( + async function* () { + yield { name: "f.txt", path: "/f.txt", is_directory: false }; + }, + ); + + const handler = getRouteHandler(plugin, "get", "/list"); + // Provide x-forwarded-user (so `_extractObiUser` accepts the dev + // fallback identity) but no x-forwarded-access-token. + await handler( + mockReq("obo_vol", { + "x-forwarded-user": "alice@example.com", + }), + mockRes(), + ); + + // Span must NOT be tagged on-behalf-of-user when no user context was + // built (dev-fallback runs as SP). + const obo = calls.find( + (c) => c.attributes?.["files.auth_mode"] === "on-behalf-of-user", + ); + expect(obo).toBeUndefined(); + + const sp = calls.find( + (c) => c.attributes?.["files.auth_mode"] === "service-principal", + ); + expect(sp).toBeDefined(); + }); + }); + describe("Upload Stream Size Limiter", () => { test("stream under limit passes through all chunks", async () => { const maxSize = 100; diff --git a/packages/appkit/src/plugins/files/types.ts b/packages/appkit/src/plugins/files/types.ts index 82b546886..d912efff9 100644 --- a/packages/appkit/src/plugins/files/types.ts +++ b/packages/appkit/src/plugins/files/types.ts @@ -8,6 +8,12 @@ import type { FilePolicy } from "./policy"; export interface VolumeConfig { /** Maximum upload size in bytes for this volume. Inherits from plugin-level `maxUploadSize` if not set. */ maxUploadSize?: number; + /** + * Maximum byte length the `/read` endpoint will stream before aborting with + * `413 Payload Too Large`. Inherits from plugin-level `maxReadSize` if not + * set; defaults to 10 MB. `/download` and `/raw` are unaffected. + */ + maxReadSize?: number; /** Map of file extensions to MIME types for this volume. Inherits from plugin-level `customContentTypes` if not set. */ customContentTypes?: Record; /** @@ -15,12 +21,71 @@ export interface VolumeConfig { * service principal and the policy decides whether the action is allowed. */ policy?: FilePolicy; + /** + * Per-volume auth mode. When `"on-behalf-of-user"`, HTTP route handlers + * execute Unity Catalog SDK operations as the end user (using the + * `x-forwarded-access-token` and `x-forwarded-user` headers injected by + * the Databricks Apps reverse proxy) instead of the service principal. + * Inherits from `IFilesConfig.auth` if not set; defaults to + * `"service-principal"`. + * + * **Permission scope**: + * - `"service-principal"`: the app's SP needs `WRITE_VOLUME` (or + * read-equivalent) on the UC volume. + * - `"on-behalf-of-user"`: each end user needs `WRITE_VOLUME` (or + * read-equivalent) on the UC volume; the SP itself does not need + * direct volume permissions. + * + * In production, OBO requests with a missing `x-forwarded-access-token` + * return `401`. In development (`NODE_ENV === "development"`) they fall + * back to the SP with a single warning so local testing without a + * reverse proxy continues to work. + * + * @example Service-principal volume (default) + * ```ts + * files({ + * volumes: { + * exports: { + * auth: "service-principal", // explicit; same as omitting + * policy: files.policy.publicRead(), + * }, + * }, + * }); + * ``` + * + * @example On-behalf-of-user volume + * ```ts + * files({ + * volumes: { + * "user-uploads": { + * auth: "on-behalf-of-user", + * // Policies see the real end user identity here. + * policy: (action, _resource, user) => + * !user.isServicePrincipal, + * }, + * }, + * }); + * ``` + */ + auth?: "service-principal" | "on-behalf-of-user"; } /** * User-facing API for a single volume. - * All operations execute as the service principal. When a policy is - * configured on the volume, every call is checked against that policy. + * + * Which identity executes each operation depends on the volume's effective + * `auth` mode (resolved from `VolumeConfig.auth` ?? `IFilesConfig.auth` ?? + * `"service-principal"`): + * - SP volumes (`auth: "service-principal"`): operations execute as the + * service principal. + * - OBO volumes (`auth: "on-behalf-of-user"`): operations invoked through + * the HTTP routes execute as the end user (the token from + * `x-forwarded-access-token` is used to build the SDK client). For + * programmatic calls outside an HTTP route, see `VolumeHandle.asUser(req)` + * to opt into per-user execution explicitly. + * + * When a policy is configured on the volume, every call is checked against + * that policy with the appropriate identity (service principal vs end user). */ export interface VolumeAPI { list(directoryPath?: string): Promise; @@ -50,6 +115,47 @@ export interface IFilesConfig extends BasePluginConfig { customContentTypes?: Record; /** Maximum upload size in bytes. Defaults to 5 GB (Databricks Files API v2 limit). */ maxUploadSize?: number; + /** + * Plugin-level default for the `/read` endpoint's response size cap. + * Inherited by volumes without their own `maxReadSize`. Defaults to 10 MB. + */ + maxReadSize?: number; + /** + * Plugin-level default auth mode for all volumes. Volumes without an + * explicit `auth` field inherit this default; volumes that set their own + * `VolumeConfig.auth` override it. Defaults to `"service-principal"` if + * not set. + * + * Resolution order (per volume): + * `VolumeConfig.auth` > `IFilesConfig.auth` > `"service-principal"`. + * + * @example Mark every volume OBO unless explicitly overridden + * ```ts + * files({ + * auth: "on-behalf-of-user", + * volumes: { + * // Inherits "on-behalf-of-user" from the plugin-level default. + * "user-uploads": { policy: files.policy.allowAll() }, + * // Overrides the plugin default to run as the SP. + * reports: { + * auth: "service-principal", + * policy: files.policy.publicRead(), + * }, + * }, + * }); + * ``` + * + * @example Default to service-principal (the implicit default) + * ```ts + * files({ + * // No `auth` set → all volumes default to "service-principal". + * volumes: { + * exports: { policy: files.policy.publicRead() }, + * }, + * }); + * ``` + */ + auth?: "service-principal" | "on-behalf-of-user"; } /** A single entry returned when listing a directory. Re-exported from `@databricks/sdk-experimental`. */ @@ -85,9 +191,34 @@ export interface FilePreview extends FileMetadata { /** * Volume handle returned by `app.files("volumeKey")`. * - * All methods execute as the service principal and enforce the volume's - * policy (if configured) with `{ isServicePrincipal: true }`. - * `asUser(req)` re-wraps with the real user identity for per-user policy checks. + * Default execution identity follows the volume's effective `auth` mode: + * - SP volumes (`auth: "service-principal"`): methods execute as the service + * principal and the volume policy (if configured) sees + * `{ isServicePrincipal: true }`. + * - OBO volumes (`auth: "on-behalf-of-user"`): methods invoked from inside + * an HTTP route handler execute as the end user (the route wires the + * request token into a `runInUserContext` scope before calling SDK code). + * + * `asUser(req)` is a hard override: regardless of the volume's `auth` + * setting (SP or OBO), the returned API runs every SDK call inside + * `runInUserContext` with the user identity extracted from the request, + * so the underlying `WorkspaceClient` is the user-token client. This makes + * `appKit.files("sp-vol").asUser(req).list()` actually execute as the end + * user, even on a service-principal-configured volume. + * + * In production `asUser` throws `AuthenticationError.missingToken` when the + * `x-forwarded-user` header is missing. In development + * (`NODE_ENV === "development"`) it logs a warning and falls back to the + * service principal so local testing without a reverse proxy continues to + * work — in that fallback no `runInUserContext` wrap is applied and SDK + * calls execute as the SP, identical to pre-OBO behavior. + * + * @remarks Behavior change: prior to OBO support, `asUser(req)` only + * influenced the policy user passed to the volume policy — the underlying + * SDK call still ran as the service principal. With OBO support landed, + * `asUser` now also forces the SDK call to run as the user. Any caller + * that relied on the pre-OBO behavior (policy sees user, SDK runs as SP) + * must remove the `asUser` wrap. */ export type VolumeHandle = VolumeAPI & { asUser: (req: IAppRequest) => VolumeAPI;