Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/appkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"@opentelemetry/sdk-trace-base": "2.6.0",
"@opentelemetry/semantic-conventions": "1.38.0",
"@types/semver": "7.7.1",
"apache-arrow": "21.1.0",
"dotenv": "16.6.1",
"express": "4.22.0",
"obug": "2.1.1",
Expand Down
82 changes: 82 additions & 0 deletions packages/appkit/src/connectors/sql-warehouse/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
type sql,
type WorkspaceClient,
} from "@databricks/sdk-experimental";
import { tableFromIPC } from "apache-arrow";
import type { TelemetryOptions } from "shared";
import {
AppKitError,
Expand All @@ -25,6 +26,42 @@ import { executeStatementDefaults } from "./defaults";

const logger = createLogger("connectors:sql-warehouse");

/**
* Recursively converts BigInt values to strings so that Arrow IPC rows can be
* passed through `JSON.stringify` (used by the SSE stream encoder). Also
* flattens apache-arrow `Vector`s (which `row.toJSON()` leaves in place for
* LIST columns) into plain arrays — both so the result is valid JSON and so
* downstream consumers see the same shape as the legacy data_array path.
*/
function deepStringifyBigInts(value: unknown): unknown {
if (typeof value === "bigint") {
return value.toString();
}
if (Array.isArray(value)) {
return value.map(deepStringifyBigInts);
}
if (
value !== null &&
typeof value === "object" &&
typeof (value as { toArray?: unknown }).toArray === "function"
) {
// Apache Arrow Vector — produced by `row.toJSON()` for LIST columns.
// `Array.from` (rather than `.toArray()`) so we get a plain Array we can
// map over: `.toArray()` on an Int64 vector returns a `BigInt64Array`
// typed array, which preserves its element type through `.map()` and
// would re-throw on bigint→string assignment.
return Array.from(value as Iterable<unknown>).map(deepStringifyBigInts);
}
if (value !== null && typeof value === "object") {
const out: Record<string, unknown> = {};
for (const [k, v] of Object.entries(value as Record<string, unknown>)) {
out[k] = deepStringifyBigInts(v);
}
return out;
}
return value;
}

interface SQLWarehouseConfig {
timeout?: number;
telemetry?: TelemetryOptions;
Expand Down Expand Up @@ -392,6 +429,16 @@ export class SQLWarehouseConnector {
}

private _transformDataArray(response: sql.StatementResponse) {
// Serverless warehouses return ARROW_STREAM with INLINE disposition: the
// data is base64 Arrow IPC under `result.attachment` instead of
// `result.data_array`. Decode it before falling through to the legacy
// ARROW_STREAM branch (which only handles EXTERNAL_LINKS).
const attachment = (response.result as undefined | { attachment?: string })
?.attachment;
if (attachment) {
return this._transformArrowAttachment(response, attachment);
}

if (response.manifest?.format === "ARROW_STREAM") {
return this.updateWithArrowStatus(response);
}
Expand Down Expand Up @@ -439,6 +486,41 @@ export class SQLWarehouseConnector {
};
}

/**
* Decode a base64 Arrow IPC attachment into row objects.
* Some serverless warehouses return inline results as Arrow IPC in
* `result.attachment` rather than `result.data_array`.
*/
private _transformArrowAttachment(
response: sql.StatementResponse,
attachment: string,
) {
let data: unknown[];
try {
const buf = Buffer.from(attachment, "base64");
const table = tableFromIPC(buf);
data = table.toArray().map((row) => deepStringifyBigInts(row.toJSON()));
} catch (err) {
throw ExecutionError.statementFailed(
`Failed to decode Arrow IPC attachment: ${
err instanceof Error ? err.message : String(err)
}`,
);
}

const result = response.result as
| (NonNullable<sql.StatementResponse["result"]> & { attachment?: string })
| undefined;
const { attachment: _att, ...restResult } = result ?? {};
return {
...response,
result: {
...restResult,
data,
},
};
}

private updateWithArrowStatus(response: sql.StatementResponse): {
result: { statement_id: string; status: sql.StatementStatus };
} {
Expand Down
141 changes: 141 additions & 0 deletions packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import type { sql } from "@databricks/sdk-experimental";
import { Int64, Table, tableToIPC, vectorFromArray } from "apache-arrow";
import { describe, expect, it } from "vitest";
import { SQLWarehouseConnector } from "../client";

function arrowAttachment(table: Table): string {
return Buffer.from(tableToIPC(table, "stream")).toString("base64");
}

function callTransform(
client: SQLWarehouseConnector,
response: sql.StatementResponse,
): sql.StatementResponse {
// _transformDataArray is the private dispatcher we want to exercise; tests
// intentionally reach in to validate the attachment-decoding path is wired.
const fn = (
client as unknown as {
_transformDataArray: (r: sql.StatementResponse) => sql.StatementResponse;
}
)._transformDataArray.bind(client);
return fn(response);
}

describe("SQLWarehouseConnector._transformDataArray (Arrow IPC attachment)", () => {
const client = new SQLWarehouseConnector({});

it("decodes inline ARROW_STREAM attachments and stringifies BigInt columns", () => {
const table = new Table({
id: vectorFromArray([1n, 2n], new Int64()),
name: vectorFromArray(["alice", "bob"]),
count: vectorFromArray([100n, 250n], new Int64()),
});

const response = {
manifest: { format: "ARROW_STREAM" },
result: { attachment: arrowAttachment(table) },
} as unknown as sql.StatementResponse;

const out = callTransform(client, response) as unknown as {
result: { data: Array<Record<string, unknown>>; attachment?: string };
};

expect(out.result.attachment).toBeUndefined();
expect(out.result.data).toHaveLength(2);
expect(out.result.data[0]).toEqual({
id: "1",
name: "alice",
count: "100",
});
expect(out.result.data[1]).toEqual({ id: "2", name: "bob", count: "250" });

// Crucial: the rows must round-trip through JSON without throwing.
expect(() => JSON.stringify(out.result.data)).not.toThrow();
});

it("recursively stringifies BigInts inside list columns and unwraps Arrow Vectors", () => {
// LIST<BIGINT> columns: apache-arrow infers the list child type from the
// bigint elements, so this round-trips through IPC as List<Int64>.
const table = new Table({
ids: vectorFromArray([
[1n, 2n, 3n],
[4n, 5n],
]),
label: vectorFromArray(["x", "y"]),
});

const response = {
manifest: { format: "ARROW_STREAM" },
result: { attachment: arrowAttachment(table) },
} as unknown as sql.StatementResponse;

const out = callTransform(client, response) as unknown as {
result: { data: Array<Record<string, unknown>> };
};

// Arrow Vector children must be flattened to plain arrays AND every nested
// BigInt must be stringified so the row survives JSON.stringify.
expect(out.result.data[0].ids).toEqual(["1", "2", "3"]);
expect(out.result.data[1].ids).toEqual(["4", "5"]);
expect(() => JSON.stringify(out.result.data)).not.toThrow();
});

it("falls through to the EXTERNAL_LINKS ARROW_STREAM branch when no attachment", () => {
// No attachment, no data_array: legacy ARROW_STREAM path returns just
// statement_id + status (chunks fetched separately via getArrowData).
const response = {
statement_id: "abc-123",
status: { state: "SUCCEEDED" },
manifest: { format: "ARROW_STREAM" },
result: {},
} as unknown as sql.StatementResponse;

const out = callTransform(client, response) as unknown as {
result: { statement_id?: string; data?: unknown };
};

expect(out.result.statement_id).toBe("abc-123");
expect(out.result.data).toBeUndefined();
});

it("throws ExecutionError on malformed Arrow IPC attachment", () => {
const response = {
manifest: { format: "ARROW_STREAM" },
result: { attachment: "this-is-not-valid-base64-arrow-ipc" },
} as unknown as sql.StatementResponse;

expect(() => callTransform(client, response)).toThrow(
/Failed to decode Arrow IPC attachment/,
);
});

it("preserves the legacy JSON_ARRAY (data_array) path unchanged", () => {
const response = {
manifest: {
format: "JSON_ARRAY",
schema: {
columns: [
{ name: "id", type_name: "INT" },
{ name: "label", type_name: "STRING" },
],
},
},
result: {
data_array: [
[1, "a"],
[2, "b"],
],
},
} as unknown as sql.StatementResponse;

const out = callTransform(client, response) as unknown as {
result: { data: Array<Record<string, unknown>>; data_array?: unknown };
};

expect(out.result.data_array).toBeUndefined();
expect(out.result.data).toEqual([
{ id: 1, label: "a" },
{ id: 2, label: "b" },
]);
});
});
6 changes: 5 additions & 1 deletion pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading