Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
59d70a9
Add INLINE + ARROW_STREAM format support for analytics plugin
jamesbroadhead Apr 3, 2026
b1566ea
Add tests for ARROW_STREAM and ARROW format parameter handling
jamesbroadhead Apr 3, 2026
dbe8ea3
fix: propagate ARROW_STREAM format to UI layer and typegen
jamesbroadhead Apr 3, 2026
8fe05d8
fix: default analytics format to ARROW_STREAM for broadest warehouse …
jamesbroadhead Apr 3, 2026
4725c97
feat: automatic format fallback for warehouse compatibility
jamesbroadhead Apr 3, 2026
a4ad7b0
fix: handle ARROW_STREAM + INLINE data in _transformDataArray
jamesbroadhead Apr 14, 2026
1e17f5f
feat: decode inline Arrow IPC attachments from serverless warehouses
jamesbroadhead Apr 14, 2026
055cd41
test: add 147 tests for service-context, stream-registry, genie conne…
jamesbroadhead Apr 15, 2026
a003274
refactor: use API enum names (JSON_ARRAY, ARROW_STREAM) and simplify …
jamesbroadhead Apr 16, 2026
2351f38
fix: address ACE multi-model review findings
jamesbroadhead Apr 27, 2026
997d6a7
fix: keep ARROW_STREAM contract consistent across INLINE/EXTERNAL_LINKS
jamesbroadhead Apr 27, 2026
694feed
test: drop unrelated files-plugin upload tests from this PR
jamesbroadhead Apr 27, 2026
e1e9017
fix: handle ARROW_STREAM attachment in type generator
jamesbroadhead Apr 28, 2026
003a309
fix: address ACE iter-2 review findings
jamesbroadhead Apr 28, 2026
cf50679
fix: synthesize empty Arrow IPC for empty ARROW_STREAM responses
jamesbroadhead Apr 28, 2026
aef9042
fix: align SSE event size with inline Arrow attachment cap (8 MiB)
jamesbroadhead Apr 28, 2026
39f971c
fix: address ACE iter-2 review findings (round 2)
jamesbroadhead Apr 28, 2026
3d32491
fix: preserve structured error_code through ExecutionError wrapping
jamesbroadhead Apr 28, 2026
3951a00
feat: zod-validated wire protocol for analytics SSE messages
jamesbroadhead Apr 28, 2026
cc9dff9
test: tighten typegen malformed-attachment assertions
jamesbroadhead Apr 28, 2026
7066faa
fix: ci — zod v4 z.record() arity + regenerated ExecutionError docs
jamesbroadhead Apr 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions docs/docs/api/appkit/Class.ExecutionError.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ throw new ExecutionError("Statement was canceled");
new ExecutionError(message: string, options?: {
cause?: Error;
context?: Record<string, unknown>;
errorCode?: string;
}): ExecutionError;
```

Expand All @@ -30,15 +31,16 @@ new ExecutionError(message: string, options?: {
| Parameter | Type |
| ------ | ------ |
| `message` | `string` |
| `options?` | \{ `cause?`: `Error`; `context?`: `Record`\<`string`, `unknown`\>; \} |
| `options?` | \{ `cause?`: `Error`; `context?`: `Record`\<`string`, `unknown`\>; `errorCode?`: `string`; \} |
| `options.cause?` | `Error` |
| `options.context?` | `Record`\<`string`, `unknown`\> |
| `options.errorCode?` | `string` |

#### Returns

`ExecutionError`

#### Inherited from
#### Overrides

[`AppKitError`](Class.AppKitError.md).[`constructor`](Class.AppKitError.md#constructor)

Expand Down Expand Up @@ -86,6 +88,19 @@ Additional context for the error

***

### errorCode?

```ts
readonly optional errorCode: string;
```

Structured error code from the upstream source (typically the warehouse's
`error_code` for statement-level failures, or the SDK's `ApiError.errorCode`
for HTTP failures). Preserved through wrapping so callers can branch on a
stable identifier without substring-matching the message.

***

### isRetryable

```ts
Expand Down Expand Up @@ -202,16 +217,17 @@ Create an execution error for closed/expired results
### statementFailed()

```ts
static statementFailed(errorMessage?: string): ExecutionError;
static statementFailed(errorMessage?: string, errorCode?: string): ExecutionError;
```

Create an execution error for statement failure
Create an execution error for statement failure.

#### Parameters

| Parameter | Type |
| ------ | ------ |
| `errorMessage?` | `string` |
| Parameter | Type | Description |
| ------ | ------ | ------ |
| `errorMessage?` | `string` | Human-readable error from the warehouse / SDK. |
| `errorCode?` | `string` | Structured code (e.g. "INVALID_PARAMETER_VALUE") to preserve through wrapping. Optional. |

#### Returns

Expand Down
5 changes: 4 additions & 1 deletion packages/appkit-ui/src/js/sse/connect-sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ export async function connectSSE<Payload = unknown>(
lastEventId: initialLastEventId = null,
retryDelay = 2000,
maxRetries = 3,
maxBufferSize = 1024 * 1024, // 1MB
// 8 MiB — sized to receive inline Arrow IPC attachments from
// ARROW_STREAM analytics responses; matches the server's stream
// `maxEventSize`. Most events are well under 1 MiB in practice.
maxBufferSize = 8 * 1024 * 1024,
timeout = 300000, // 5 minutes
onError,
} = options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ describe("isQueryProps", () => {
const props = {
queryKey: "test_query",
parameters: { limit: 100 },
format: "json" as const,
format: "json_array" as const,
};

expect(isQueryProps(props as any)).toBe(true);
Expand Down
6 changes: 3 additions & 3 deletions packages/appkit-ui/src/react/charts/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { Table } from "apache-arrow";
// ============================================================================

/** Supported data formats for analytics queries */
export type DataFormat = "json" | "arrow" | "auto";
export type DataFormat = "json_array" | "arrow_stream" | "auto";

/** Chart orientation */
export type Orientation = "vertical" | "horizontal";
Expand Down Expand Up @@ -77,8 +77,8 @@ export interface QueryProps extends ChartBaseProps {
parameters?: Record<string, unknown>;
/**
* Data format to use
* - "json": Use JSON format (smaller payloads, simpler)
* - "arrow": Use Arrow format (faster for large datasets)
* - "json_array": Use JSON format (smaller payloads, simpler)
* - "arrow_stream": Use Arrow format (faster for large datasets)
* - "auto": Automatically select based on expected data size
* @default "auto"
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { renderHook, waitFor } from "@testing-library/react";
import { beforeEach, describe, expect, test, vi } from "vitest";

// Capture the onMessage handler so tests can drive SSE messages directly.
let lastConnectArgs: any = null;
const mockProcessArrowBuffer = vi.fn();
const mockFetchArrow = vi.fn();

vi.mock("@/js", () => ({
connectSSE: vi.fn((args: any) => {
lastConnectArgs = args;
return () => {};
}),
ArrowClient: {
fetchArrow: (...args: unknown[]) => mockFetchArrow(...args),
processArrowBuffer: (...args: unknown[]) => mockProcessArrowBuffer(...args),
},
}));

// useQueryHMR is a no-op shim for tests; mock to avoid HMR side effects.
vi.mock("../use-query-hmr", () => ({
useQueryHMR: vi.fn(),
}));

import { useAnalyticsQuery } from "../use-analytics-query";

describe("useAnalyticsQuery", () => {
beforeEach(() => {
vi.clearAllMocks();
lastConnectArgs = null;
});

test("decodes arrow_inline base64 attachment via ArrowClient.processArrowBuffer", async () => {
const fakeTable = { numRows: 1, schema: { fields: [] } };
mockProcessArrowBuffer.mockResolvedValueOnce(fakeTable);

// 'AQID' decodes to bytes [1, 2, 3].
const base64 = "AQID";

const { result } = renderHook(() =>
useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }),
);

// Drive the SSE onMessage handler with an arrow_inline payload.
await lastConnectArgs.onMessage({
data: JSON.stringify({ type: "arrow_inline", attachment: base64 }),
});

await waitFor(() => {
expect(result.current.data).toBe(fakeTable);
});

expect(mockProcessArrowBuffer).toHaveBeenCalledTimes(1);
const passedBuffer = mockProcessArrowBuffer.mock.calls[0][0] as Uint8Array;
expect(passedBuffer).toBeInstanceOf(Uint8Array);
expect(Array.from(passedBuffer)).toEqual([1, 2, 3]);
// Inline path must NOT trigger a network fetch.
expect(mockFetchArrow).not.toHaveBeenCalled();
});

test("surfaces an error when arrow_inline decode fails", async () => {
mockProcessArrowBuffer.mockRejectedValueOnce(new Error("bad ipc"));

const { result } = renderHook(() =>
useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }),
);

await lastConnectArgs.onMessage({
data: JSON.stringify({ type: "arrow_inline", attachment: "AQID" }),
});

await waitFor(() => {
expect(result.current.error).toBe(
"Unable to load data, please try again",
);
});
expect(result.current.loading).toBe(false);
});

test("rejects arrow_inline with missing/empty/non-string attachment without crashing atob", async () => {
const cases: Array<unknown> = [undefined, null, "", 123, { foo: "bar" }];

for (const attachment of cases) {
mockProcessArrowBuffer.mockClear();
const { result, unmount } = renderHook(() =>
useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }),
);

await lastConnectArgs.onMessage({
data: JSON.stringify({ type: "arrow_inline", attachment }),
});

await waitFor(() => {
expect(result.current.error).toBe(
"Unable to load data, please try again",
);
});
// Critically: must NOT call processArrowBuffer (or atob) on the bad input.
expect(mockProcessArrowBuffer).not.toHaveBeenCalled();

unmount();
}
});

test("rejects oversized arrow_inline attachment without allocating a huge buffer", async () => {
// Base64 string that would decode to ~9 MiB (>8 MiB cap). The hook
// should reject before calling decodeBase64 / processArrowBuffer.
const oversized = "A".repeat(13 * 1024 * 1024);

const { result } = renderHook(() =>
useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }),
);

await lastConnectArgs.onMessage({
data: JSON.stringify({ type: "arrow_inline", attachment: oversized }),
});

await waitFor(() => {
expect(result.current.error).toBe(
"Unable to load data, please try again",
);
});
expect(mockProcessArrowBuffer).not.toHaveBeenCalled();
});

test("still handles type:result rows for JSON_ARRAY", async () => {
const { result } = renderHook(() =>
useAnalyticsQuery("q", null, { format: "JSON_ARRAY" }),
);

await lastConnectArgs.onMessage({
data: JSON.stringify({
type: "result",
data: [{ id: 1 }, { id: 2 }],
}),
});

await waitFor(() => {
expect(result.current.data).toEqual([{ id: 1 }, { id: 2 }]);
});
expect(mockProcessArrowBuffer).not.toHaveBeenCalled();
});
});
Loading
Loading