diff --git a/packages/appkit/package.json b/packages/appkit/package.json index 3b57014c0..04232f88b 100644 --- a/packages/appkit/package.json +++ b/packages/appkit/package.json @@ -69,6 +69,7 @@ "@opentelemetry/sdk-trace-base": "2.6.0", "@opentelemetry/semantic-conventions": "1.38.0", "@types/semver": "7.7.1", + "apache-arrow": "21.1.0", "dotenv": "16.6.1", "express": "4.22.0", "obug": "2.1.1", diff --git a/packages/appkit/src/connectors/sql-warehouse/client.ts b/packages/appkit/src/connectors/sql-warehouse/client.ts index 4ab9344e8..80d0aecb7 100644 --- a/packages/appkit/src/connectors/sql-warehouse/client.ts +++ b/packages/appkit/src/connectors/sql-warehouse/client.ts @@ -3,6 +3,7 @@ import { type sql, type WorkspaceClient, } from "@databricks/sdk-experimental"; +import { tableFromIPC } from "apache-arrow"; import type { TelemetryOptions } from "shared"; import { AppKitError, @@ -25,6 +26,42 @@ import { executeStatementDefaults } from "./defaults"; const logger = createLogger("connectors:sql-warehouse"); +/** + * Recursively converts BigInt values to strings so that Arrow IPC rows can be + * passed through `JSON.stringify` (used by the SSE stream encoder). Also + * flattens apache-arrow `Vector`s (which `row.toJSON()` leaves in place for + * LIST columns) into plain arrays — both so the result is valid JSON and so + * downstream consumers see the same shape as the legacy data_array path. + */ +function deepStringifyBigInts(value: unknown): unknown { + if (typeof value === "bigint") { + return value.toString(); + } + if (Array.isArray(value)) { + return value.map(deepStringifyBigInts); + } + if ( + value !== null && + typeof value === "object" && + typeof (value as { toArray?: unknown }).toArray === "function" + ) { + // Apache Arrow Vector — produced by `row.toJSON()` for LIST columns. + // `Array.from` (rather than `.toArray()`) so we get a plain Array we can + // map over: `.toArray()` on an Int64 vector returns a `BigInt64Array` + // typed array, which preserves its element type through `.map()` and + // would re-throw on bigint→string assignment. + return Array.from(value as Iterable).map(deepStringifyBigInts); + } + if (value !== null && typeof value === "object") { + const out: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + out[k] = deepStringifyBigInts(v); + } + return out; + } + return value; +} + interface SQLWarehouseConfig { timeout?: number; telemetry?: TelemetryOptions; @@ -392,6 +429,16 @@ export class SQLWarehouseConnector { } private _transformDataArray(response: sql.StatementResponse) { + // Serverless warehouses return ARROW_STREAM with INLINE disposition: the + // data is base64 Arrow IPC under `result.attachment` instead of + // `result.data_array`. Decode it before falling through to the legacy + // ARROW_STREAM branch (which only handles EXTERNAL_LINKS). + const attachment = (response.result as undefined | { attachment?: string }) + ?.attachment; + if (attachment) { + return this._transformArrowAttachment(response, attachment); + } + if (response.manifest?.format === "ARROW_STREAM") { return this.updateWithArrowStatus(response); } @@ -439,6 +486,41 @@ export class SQLWarehouseConnector { }; } + /** + * Decode a base64 Arrow IPC attachment into row objects. + * Some serverless warehouses return inline results as Arrow IPC in + * `result.attachment` rather than `result.data_array`. + */ + private _transformArrowAttachment( + response: sql.StatementResponse, + attachment: string, + ) { + let data: unknown[]; + try { + const buf = Buffer.from(attachment, "base64"); + const table = tableFromIPC(buf); + data = table.toArray().map((row) => deepStringifyBigInts(row.toJSON())); + } catch (err) { + throw ExecutionError.statementFailed( + `Failed to decode Arrow IPC attachment: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + + const result = response.result as + | (NonNullable & { attachment?: string }) + | undefined; + const { attachment: _att, ...restResult } = result ?? {}; + return { + ...response, + result: { + ...restResult, + data, + }, + }; + } + private updateWithArrowStatus(response: sql.StatementResponse): { result: { statement_id: string; status: sql.StatementStatus }; } { diff --git a/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts b/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts new file mode 100644 index 000000000..0e54370b8 --- /dev/null +++ b/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts @@ -0,0 +1,141 @@ +import type { sql } from "@databricks/sdk-experimental"; +import { Int64, Table, tableToIPC, vectorFromArray } from "apache-arrow"; +import { describe, expect, it } from "vitest"; +import { SQLWarehouseConnector } from "../client"; + +function arrowAttachment(table: Table): string { + return Buffer.from(tableToIPC(table, "stream")).toString("base64"); +} + +function callTransform( + client: SQLWarehouseConnector, + response: sql.StatementResponse, +): sql.StatementResponse { + // _transformDataArray is the private dispatcher we want to exercise; tests + // intentionally reach in to validate the attachment-decoding path is wired. + const fn = ( + client as unknown as { + _transformDataArray: (r: sql.StatementResponse) => sql.StatementResponse; + } + )._transformDataArray.bind(client); + return fn(response); +} + +describe("SQLWarehouseConnector._transformDataArray (Arrow IPC attachment)", () => { + const client = new SQLWarehouseConnector({}); + + it("decodes inline ARROW_STREAM attachments and stringifies BigInt columns", () => { + const table = new Table({ + id: vectorFromArray([1n, 2n], new Int64()), + name: vectorFromArray(["alice", "bob"]), + count: vectorFromArray([100n, 250n], new Int64()), + }); + + const response = { + manifest: { format: "ARROW_STREAM" }, + result: { attachment: arrowAttachment(table) }, + } as unknown as sql.StatementResponse; + + const out = callTransform(client, response) as unknown as { + result: { data: Array>; attachment?: string }; + }; + + expect(out.result.attachment).toBeUndefined(); + expect(out.result.data).toHaveLength(2); + expect(out.result.data[0]).toEqual({ + id: "1", + name: "alice", + count: "100", + }); + expect(out.result.data[1]).toEqual({ id: "2", name: "bob", count: "250" }); + + // Crucial: the rows must round-trip through JSON without throwing. + expect(() => JSON.stringify(out.result.data)).not.toThrow(); + }); + + it("recursively stringifies BigInts inside list columns and unwraps Arrow Vectors", () => { + // LIST columns: apache-arrow infers the list child type from the + // bigint elements, so this round-trips through IPC as List. + const table = new Table({ + ids: vectorFromArray([ + [1n, 2n, 3n], + [4n, 5n], + ]), + label: vectorFromArray(["x", "y"]), + }); + + const response = { + manifest: { format: "ARROW_STREAM" }, + result: { attachment: arrowAttachment(table) }, + } as unknown as sql.StatementResponse; + + const out = callTransform(client, response) as unknown as { + result: { data: Array> }; + }; + + // Arrow Vector children must be flattened to plain arrays AND every nested + // BigInt must be stringified so the row survives JSON.stringify. + expect(out.result.data[0].ids).toEqual(["1", "2", "3"]); + expect(out.result.data[1].ids).toEqual(["4", "5"]); + expect(() => JSON.stringify(out.result.data)).not.toThrow(); + }); + + it("falls through to the EXTERNAL_LINKS ARROW_STREAM branch when no attachment", () => { + // No attachment, no data_array: legacy ARROW_STREAM path returns just + // statement_id + status (chunks fetched separately via getArrowData). + const response = { + statement_id: "abc-123", + status: { state: "SUCCEEDED" }, + manifest: { format: "ARROW_STREAM" }, + result: {}, + } as unknown as sql.StatementResponse; + + const out = callTransform(client, response) as unknown as { + result: { statement_id?: string; data?: unknown }; + }; + + expect(out.result.statement_id).toBe("abc-123"); + expect(out.result.data).toBeUndefined(); + }); + + it("throws ExecutionError on malformed Arrow IPC attachment", () => { + const response = { + manifest: { format: "ARROW_STREAM" }, + result: { attachment: "this-is-not-valid-base64-arrow-ipc" }, + } as unknown as sql.StatementResponse; + + expect(() => callTransform(client, response)).toThrow( + /Failed to decode Arrow IPC attachment/, + ); + }); + + it("preserves the legacy JSON_ARRAY (data_array) path unchanged", () => { + const response = { + manifest: { + format: "JSON_ARRAY", + schema: { + columns: [ + { name: "id", type_name: "INT" }, + { name: "label", type_name: "STRING" }, + ], + }, + }, + result: { + data_array: [ + [1, "a"], + [2, "b"], + ], + }, + } as unknown as sql.StatementResponse; + + const out = callTransform(client, response) as unknown as { + result: { data: Array>; data_array?: unknown }; + }; + + expect(out.result.data_array).toBeUndefined(); + expect(out.result.data).toEqual([ + { id: 1, label: "a" }, + { id: 2, label: "b" }, + ]); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9ca11b818..46096f433 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -299,6 +299,9 @@ importers: '@types/semver': specifier: 7.7.1 version: 7.7.1 + apache-arrow: + specifier: 21.1.0 + version: 21.1.0 dotenv: specifier: 16.6.1 version: 16.6.1 @@ -5539,7 +5542,7 @@ packages: basic-ftp@5.0.5: resolution: {integrity: sha512-4Bcg1P8xhUuqcii/S0Z9wiHIrQVPMermM1any+MX5GeGD7faD3/msQUDGLol9wOcz4/jbg/WJnGqoJF6LiBdtg==} engines: {node: '>=10.0.0'} - deprecated: Security vulnerability fixed in 5.2.0, please upgrade + deprecated: Security vulnerability fixed in 5.2.1, please upgrade batch@0.6.1: resolution: {integrity: sha512-x+VAiMRL6UPkx+kudNvxTl6hB2XNNCG2r+7wixVfIYwu/2HKRXimwQyaumLjMveWvT2Hkd/cAJw+QBMfJ/EKVw==} @@ -6653,6 +6656,7 @@ packages: dottie@2.0.6: resolution: {integrity: sha512-iGCHkfUc5kFekGiqhe8B/mdaurD+lakO9txNnTvKtA6PISrw86LgqHvRzWYPyoE2Ph5aMIrCw9/uko6XHTKCwA==} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. drizzle-orm@0.45.1: resolution: {integrity: sha512-Te0FOdKIistGNPMq2jscdqngBRfBpC8uMFVwqjf6gtTVJHIQ/dosgV/CLBU2N4ZJBsXL5savCba9b0YJskKdcA==}