From c13aa00e03227b5b9f0e45e11fab9145d10c734a Mon Sep 17 00:00:00 2001 From: Riley Des Date: Mon, 27 Apr 2026 08:57:24 -0500 Subject: [PATCH 01/22] chore(.gitignore): add kiro directory to gitignore Signed-off-by: Riley Des --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index fe0d10b5d..4ad1323ab 100644 --- a/.gitignore +++ b/.gitignore @@ -169,6 +169,9 @@ skills !packages/core/src/workspace/skills !packages/core/src/workspace/skills/** +# kiro +.kiro + dev-debug.log node_modules/ From 59032a07e4c1cbf7adce44c0f1e04d9d47ace441 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Thu, 30 Apr 2026 08:13:20 -0500 Subject: [PATCH 02/22] feat(a2a-server): add Valkey task store implementation - Add ValkeyTaskStore class for distributed task persistence using Valkey - Support optional TTL configuration for automatic task expiration - Add createValkeyTaskStore factory function with lazy loading of @valkey/valkey-glide - Make @valkey/valkey-glide an optional peer dependency - Add taskStore configuration option to A2AServerConfig - Update task store initialization to respect precedence: config.taskStore > deps.taskStore > InMemoryTaskStore - Export ValkeyTaskStore from main index Signed-off-by: Riley Des --- packages/a2a-server/package.json | 6 ++ packages/a2a-server/src/index.ts | 1 + packages/a2a-server/src/server.ts | 9 ++- packages/a2a-server/src/types.ts | 5 ++ packages/a2a-server/src/valkey-store.ts | 85 +++++++++++++++++++++++++ pnpm-lock.yaml | 63 ++++++++++++++++++ 6 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 packages/a2a-server/src/valkey-store.ts diff --git a/packages/a2a-server/package.json b/packages/a2a-server/package.json index b3d536dc9..17e989978 100644 --- a/packages/a2a-server/package.json +++ b/packages/a2a-server/package.json @@ -29,8 +29,14 @@ "main": "dist/index.js", "module": "dist/index.mjs", "peerDependencies": { + "@valkey/valkey-glide": ">=2.3.1", "@voltagent/core": "^2.0.0" }, + "peerDependenciesMeta": { + "@valkey/valkey-glide": { + "optional": true + } + }, "repository": { "type": "git", "url": "https://github.com/VoltAgent/voltagent.git", diff --git a/packages/a2a-server/src/index.ts b/packages/a2a-server/src/index.ts index 673b8a56b..11b92af4c 100644 --- a/packages/a2a-server/src/index.ts +++ b/packages/a2a-server/src/index.ts @@ -2,6 +2,7 @@ export * from "./types"; export * from "./server"; export * from "./protocol"; export * from "./store"; +export * from "./valkey-store"; export * from "./tasks"; export * from "./adapters/agent"; export * from "./adapters/message"; diff --git a/packages/a2a-server/src/server.ts b/packages/a2a-server/src/server.ts index eb801f063..4937ef68f 100644 --- a/packages/a2a-server/src/server.ts +++ b/packages/a2a-server/src/server.ts @@ -76,10 +76,17 @@ export class A2AServer { } } + /** + * Initializes the server with runtime dependencies. + * + * Task store precedence: `config.taskStore` (constructor) > `deps.taskStore` > `InMemoryTaskStore`. + * If a `taskStore` was provided in the `A2AServerConfig` constructor, it takes priority over + * the one supplied here in `deps`. + */ initialize(deps: A2AServerDeps): void { this.deps = { ...deps, - taskStore: deps.taskStore ?? new InMemoryTaskStore(), + taskStore: this.config.taskStore ?? deps.taskStore ?? new InMemoryTaskStore(), } as Required; } diff --git a/packages/a2a-server/src/types.ts b/packages/a2a-server/src/types.ts index 352db4eeb..3c8d240e2 100644 --- a/packages/a2a-server/src/types.ts +++ b/packages/a2a-server/src/types.ts @@ -144,6 +144,11 @@ export interface A2AServerConfig { }; agents?: Record; filterAgents?: A2AFilterFunction; + /** + * Optional task store. When provided, takes precedence over `deps.taskStore` passed to + * `A2AServer.initialize()`. Falls back to an in-memory store if neither is set. + */ + taskStore?: TaskStore; } export interface A2AServerMetadata extends BaseA2AServerMetadata {} diff --git a/packages/a2a-server/src/valkey-store.ts b/packages/a2a-server/src/valkey-store.ts new file mode 100644 index 000000000..86999afeb --- /dev/null +++ b/packages/a2a-server/src/valkey-store.ts @@ -0,0 +1,85 @@ +import type { GlideClient, GlideClusterClient, TimeUnit } from "@valkey/valkey-glide"; +import { safeStringify } from "@voltagent/internal"; +import type { TaskRecord, TaskStore } from "./types"; + +export interface ValkeyTaskStoreOptions { + client: GlideClient | GlideClusterClient; + keyPrefix?: string; + ttlSeconds?: number; +} + +const VALKEY_GLIDE_REQUIRED = + "@valkey/valkey-glide is required for ValkeyTaskStore. Install it with: pnpm add @valkey/valkey-glide"; + +export async function createValkeyTaskStore( + options: ValkeyTaskStoreOptions, +): Promise { + let timeUnitSeconds: TimeUnit | undefined; + if (options.ttlSeconds !== undefined) { + try { + const mod = await import("@valkey/valkey-glide"); + timeUnitSeconds = mod.TimeUnit.Seconds; + } catch { + throw new Error(VALKEY_GLIDE_REQUIRED); + } + } + return new ValkeyTaskStore(options, timeUnitSeconds); +} + +export class ValkeyTaskStore implements TaskStore { + private readonly client: GlideClient | GlideClusterClient; + private readonly keyPrefix: string; + private readonly ttlSeconds?: number; + private timeUnitSeconds?: TimeUnit; + + // In-process only, not propagated across instances via Valkey. + readonly activeCancellations = new Set(); + + constructor(options: ValkeyTaskStoreOptions, timeUnitSeconds?: TimeUnit) { + this.client = options.client; + this.keyPrefix = options.keyPrefix ?? "a2a-tasks"; + this.ttlSeconds = options.ttlSeconds; + this.timeUnitSeconds = timeUnitSeconds; + } + + private async getTimeUnitSeconds(): Promise { + if (this.timeUnitSeconds !== undefined) return this.timeUnitSeconds; + try { + const mod = await import("@valkey/valkey-glide"); + this.timeUnitSeconds = mod.TimeUnit.Seconds; + return this.timeUnitSeconds; + } catch { + throw new Error(VALKEY_GLIDE_REQUIRED); + } + } + + async load(params: { agentId: string; taskId: string }): Promise { + const key = this.makeKey(params.agentId, params.taskId); + const result = await this.client.get(key); + if (result === null) return null; + try { + return JSON.parse(String(result)) as TaskRecord; + } catch (error) { + const detail = error instanceof Error ? error.message : "Unknown error"; + throw new Error(`Failed to parse stored TaskRecord for key "${key}": ${detail}`); + } + } + + async save(params: { agentId: string; data: TaskRecord }): Promise { + const key = this.makeKey(params.agentId, params.data.id); + const json = safeStringify(params.data); + + if (this.ttlSeconds !== undefined) { + const seconds = await this.getTimeUnitSeconds(); + await this.client.set(key, json, { + expiry: { type: seconds, count: this.ttlSeconds }, + }); + } else { + await this.client.set(key, json); + } + } + + private makeKey(agentId: string, taskId: string): string { + return `${this.keyPrefix}:${agentId}::${taskId}`; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 62ea9b37d..8854f4485 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -21209,6 +21209,69 @@ packages: - graphql dev: false + /@valkey/valkey-glide-darwin-arm64@2.3.1: + resolution: {integrity: sha512-S3uCwGoBqQuVieBsevftlXzy/lFkDf49wQkg0UaRGMoWhmaik8d9rwTzKky1+otLjWCblWtMDcqIWGMs5ppo8Q==} + cpu: [arm64] + os: [darwin] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-darwin-x64@2.3.1: + resolution: {integrity: sha512-7F+UoyMhO0QiAHEDZApy0+PNR7v7Gct5SMiRH+8RuUk8Nf1Cc3mNVtFXk0jwwbhatiWiSNEaMDkTRPWYZN6IcQ==} + cpu: [x64] + os: [darwin] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-linux-arm64-gnu@2.3.1: + resolution: {integrity: sha512-Aigxpq6oSxPhdX7V0JZsQrBW4pzcNibCb1m6XorGdWpG62sttV/3DKz/06V+h1dSGfqpkjWPxRJS1EmTqPYmZw==} + cpu: [arm64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-linux-arm64-musl@2.3.1: + resolution: {integrity: sha512-vmohWbUncgH+Rz1OnXPeSDVChCFMy0MYL38qcmAS05aGwL2sgMjipsEWiAsc82IAauKsVE7Ou68YZcyYK/mhag==} + cpu: [arm64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-linux-x64-gnu@2.3.1: + resolution: {integrity: sha512-v/pJefhNO2fjEKq7uQAked896S2N3ZymiO/ei2v8JRxbYk1M+/Ffl6NH6SG+Mo/k2dyeRN1B5PXNq79jv149Rg==} + cpu: [x64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-linux-x64-musl@2.3.1: + resolution: {integrity: sha512-GAcWs6MrS4v6/mjoaYyd8ay+L6Zl2L9YjkuKsupWzjSsT+PEVV/3yX0ecNeK7SGMn5Wfd3InZ3gfGPEsWEhc0g==} + cpu: [x64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide@2.3.1: + resolution: {integrity: sha512-HzKCoNRSJYG83vKrYVJk+By2a0qVt/VuQfziCtdM6Q3Lt2WUE1PqxUAwO1Br2flSgrTxNdGgaGmX8qdnM25d5w==} + engines: {node: '>=16'} + dependencies: + long: 5.3.2 + protobufjs: 7.5.3 + optionalDependencies: + '@valkey/valkey-glide-darwin-arm64': 2.3.1 + '@valkey/valkey-glide-darwin-x64': 2.3.1 + '@valkey/valkey-glide-linux-arm64-gnu': 2.3.1 + '@valkey/valkey-glide-linux-arm64-musl': 2.3.1 + '@valkey/valkey-glide-linux-x64-gnu': 2.3.1 + '@valkey/valkey-glide-linux-x64-musl': 2.3.1 + dev: false + /@vercel/nft@0.29.4(supports-color@10.2.2): resolution: {integrity: sha512-6lLqMNX3TuycBPABycx7A9F1bHQR7kiQln6abjFbPrf5C/05qHM9M5E4PeTE59c7z8g6vHnx1Ioihb2AQl7BTA==} engines: {node: '>=18'} From 9dff21f4274a09fcab23dc7affe1bb8e54789381 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Thu, 30 Apr 2026 08:18:16 -0500 Subject: [PATCH 03/22] test(a2a-server): add comprehensive tests for ValkeyTaskStore - Add unit tests for load() method covering deserialization, null handling, and Buffer conversion - Add unit tests for save() method covering composite key generation, default keyPrefix, JSON serialization, and TTL options - Add error handling tests for corrupted JSON and client connection failures - Add tests for activeCancellations Set exposure - Add factory function tests for createValkeyTaskStore with and without TTL configuration - Mock @valkey/valkey-glide to avoid runtime dependency in tests - Include helper functions for creating mock clients and TaskRecord fixtures Signed-off-by: Riley Des --- packages/a2a-server/src/valkey-store.spec.ts | 178 +++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 packages/a2a-server/src/valkey-store.spec.ts diff --git a/packages/a2a-server/src/valkey-store.spec.ts b/packages/a2a-server/src/valkey-store.spec.ts new file mode 100644 index 000000000..9061e9093 --- /dev/null +++ b/packages/a2a-server/src/valkey-store.spec.ts @@ -0,0 +1,178 @@ +import { safeStringify } from "@voltagent/internal"; +import type { TaskRecord } from "./types"; +import { ValkeyTaskStore, createValkeyTaskStore } from "./valkey-store"; + +// Mock @valkey/valkey-glide so tests don't require the actual package +vi.mock("@valkey/valkey-glide", () => ({ + TimeUnit: { Seconds: "EX" }, +})); + +function makeClient() { + return { + get: vi.fn(), + set: vi.fn(), + }; +} + +function makeTaskRecord(overrides: Partial = {}): TaskRecord { + return { + id: "task-1", + contextId: "ctx-1", + status: { state: "submitted", timestamp: new Date().toISOString() }, + history: [], + ...overrides, + }; +} + +describe("ValkeyTaskStore", () => { + it("load() returns deserialized TaskRecord when client.get returns a JSON string", async () => { + const client = makeClient(); + const record = makeTaskRecord(); + client.get.mockResolvedValue(safeStringify(record)); + + const store = new ValkeyTaskStore({ client } as any); + const result = await store.load({ agentId: "agent-1", taskId: "task-1" }); + + expect(result).toEqual(record); + }); + + it("load() returns null when client.get returns null", async () => { + const client = makeClient(); + client.get.mockResolvedValue(null); + + const store = new ValkeyTaskStore({ client } as any); + const result = await store.load({ agentId: "agent-1", taskId: "task-1" }); + + expect(result).toBeNull(); + }); + + it("load() handles GlideString Buffer values by converting to string", async () => { + const client = makeClient(); + const record = makeTaskRecord(); + client.get.mockResolvedValue(Buffer.from(safeStringify(record))); + + const store = new ValkeyTaskStore({ client } as any); + const result = await store.load({ agentId: "agent-1", taskId: "task-1" }); + + expect(result).toEqual(record); + }); + + it("save() calls client.set with the correct composite key {keyPrefix}:{agentId}::{taskId}", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-42" }); + + const store = new ValkeyTaskStore({ client, keyPrefix: "my-prefix" } as any); + await store.save({ agentId: "agent-x", data: record }); + + expect(client.set.mock.calls[0][0]).toBe("my-prefix:agent-x::task-42"); + }); + + it("save() uses default keyPrefix 'a2a-tasks' when none is provided", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-42" }); + + const store = new ValkeyTaskStore({ client } as any); + await store.save({ agentId: "agent-x", data: record }); + + expect(client.set.mock.calls[0][0]).toBe("a2a-tasks:agent-x::task-42"); + }); + + it("save() serializes the TaskRecord as a JSON string containing the task data", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-99" }); + + const store = new ValkeyTaskStore({ client } as any); + await store.save({ agentId: "agent-1", data: record }); + + const storedValue = client.set.mock.calls[0][1]; + expect(typeof storedValue).toBe("string"); + const parsed = JSON.parse(storedValue); + expect(parsed.id).toBe("task-99"); + expect(parsed.contextId).toBe(record.contextId); + }); + + it("save() calls client.set with expiry options when ttlSeconds is configured", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-ttl" }); + + const store = new ValkeyTaskStore({ client, ttlSeconds: 300 } as any); + await store.save({ agentId: "agent-1", data: record }); + + const options = client.set.mock.calls[0][2]; + expect(options).toEqual({ expiry: { type: "EX", count: 300 } }); + }); + + it("save() calls client.set without expiry options when ttlSeconds is not configured", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-no-ttl" }); + + const store = new ValkeyTaskStore({ client } as any); + await store.save({ agentId: "agent-1", data: record }); + + expect(client.set.mock.calls[0]).toHaveLength(2); + }); + + it("load() propagates errors thrown by client.get", async () => { + const client = makeClient(); + client.get.mockRejectedValue(new Error("connection refused")); + + const store = new ValkeyTaskStore({ client } as any); + await expect(store.load({ agentId: "agent-1", taskId: "task-1" })).rejects.toThrow( + "connection refused", + ); + }); + + it("load() throws a descriptive error when stored data is corrupted JSON", async () => { + const client = makeClient(); + client.get.mockResolvedValue("not-valid-json{{{"); + + const store = new ValkeyTaskStore({ client, keyPrefix: "pfx" } as any); + await expect(store.load({ agentId: "agent-1", taskId: "task-1" })).rejects.toThrow( + /Failed to parse stored TaskRecord for key "pfx:agent-1::task-1"/, + ); + }); + + it("save() propagates errors thrown by client.set", async () => { + const client = makeClient(); + client.set.mockRejectedValue(new Error("write timeout")); + const record = makeTaskRecord({ id: "task-err" }); + + const store = new ValkeyTaskStore({ client } as any); + await expect(store.save({ agentId: "agent-1", data: record })).rejects.toThrow("write timeout"); + }); + + it("exposes activeCancellations Set for task cancellation signaling", () => { + const client = makeClient(); + const store = new ValkeyTaskStore({ client } as any); + + expect(store.activeCancellations).toBeInstanceOf(Set); + }); +}); + +describe("createValkeyTaskStore", () => { + it("returns a ValkeyTaskStore instance with eagerly-resolved TimeUnit", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + + const store = await createValkeyTaskStore({ client, ttlSeconds: 60 } as any); + + expect(store).toBeInstanceOf(ValkeyTaskStore); + + const record = makeTaskRecord({ id: "task-factory" }); + await store.save({ agentId: "agent-1", data: record }); + + const options = client.set.mock.calls[0][2]; + expect(options).toEqual({ expiry: { type: "EX", count: 60 } }); + }); + + it("returns a ValkeyTaskStore without resolving TimeUnit when no ttlSeconds", async () => { + const client = makeClient(); + const store = await createValkeyTaskStore({ client } as any); + expect(store).toBeInstanceOf(ValkeyTaskStore); + }); +}); From 1720d40f3e2c3cabcd88ea9280f4057002e52323 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Thu, 30 Apr 2026 08:22:39 -0500 Subject: [PATCH 04/22] feat(resumable-streams): add Valkey store implementation - Add Valkey/Glide-based store for resumable streams with pub/sub support - Export internal utilities for reuse by store implementations (markResumableStreamStoreType, buildActiveStreamKey, createActiveStreamStoreFromPublisher, mergeStreamAndActiveStore) - Add ValkeyConnectionConfig and ResumableStreamValkeyStoreOptions types for configuration - Implement subscription management with configurable max concurrent connections (default 1000) - Add TTL support for active stream keys via optional ttlSeconds parameter - Add @valkey/valkey-glide as optional peer dependency - Export createResumableStreamValkeyStore factory Signed-off-by: Riley Des --- packages/resumable-streams/package.json | 6 + packages/resumable-streams/src/index.ts | 6 + .../src/resumable-streams.ts | 8 +- .../resumable-streams/src/valkey-store.ts | 182 ++++++++++++++++++ 4 files changed, 198 insertions(+), 4 deletions(-) create mode 100644 packages/resumable-streams/src/valkey-store.ts diff --git a/packages/resumable-streams/package.json b/packages/resumable-streams/package.json index 8c2c03a27..8efce75de 100644 --- a/packages/resumable-streams/package.json +++ b/packages/resumable-streams/package.json @@ -35,9 +35,15 @@ "main": "dist/index.js", "module": "dist/index.mjs", "peerDependencies": { + "@valkey/valkey-glide": ">=2.3.1", "@voltagent/core": "^2.0.0", "ai": "^6.0.0" }, + "peerDependenciesMeta": { + "@valkey/valkey-glide": { + "optional": true + } + }, "publishConfig": { "access": "public" }, diff --git a/packages/resumable-streams/src/index.ts b/packages/resumable-streams/src/index.ts index 9e47ff561..2deda4dd5 100644 --- a/packages/resumable-streams/src/index.ts +++ b/packages/resumable-streams/src/index.ts @@ -24,3 +24,9 @@ export { type ResumableChatHandlersOptions, } from "./chat-handlers"; export { createResumableChatSession, type ResumableChatSession } from "./chat-session"; +export { createResumableStreamValkeyStore } from "./valkey-store"; +export type { + ResumableStreamValkeyStoreOptions, + ValkeyConnectionConfig, + ValkeyResumableStreamStore, +} from "./valkey-store"; diff --git a/packages/resumable-streams/src/resumable-streams.ts b/packages/resumable-streams/src/resumable-streams.ts index cad154466..97071a7e8 100644 --- a/packages/resumable-streams/src/resumable-streams.ts +++ b/packages/resumable-streams/src/resumable-streams.ts @@ -84,7 +84,7 @@ const getResumableStreamDisabledInfo = (value: unknown) => { return { reason, docsUrl }; }; -const markResumableStreamStoreType = ( +export const markResumableStreamStoreType = ( value: T, type: string, displayName?: string, @@ -201,7 +201,7 @@ const buildStreamKey = ({ conversationId, userId }: ResumableStreamContext) => { return `${userId}-${conversationId}`; }; -const buildActiveStreamKey = (keyPrefix: string, context: ResumableStreamContext) => +export const buildActiveStreamKey = (keyPrefix: string, context: ResumableStreamContext) => `${keyPrefix}:active:${buildStreamKey(context)}`; const buildActiveStreamQuery = (context: ResumableStreamContext, streamId?: string): string => { @@ -216,7 +216,7 @@ const buildActiveStreamQuery = (context: ResumableStreamContext, streamId?: stri return params.toString(); }; -const createActiveStreamStoreFromPublisher = ( +export const createActiveStreamStoreFromPublisher = ( publisher: ResumableStreamPublisher, keyPrefix: string, ): ResumableStreamActiveStore => ({ @@ -250,7 +250,7 @@ const createActiveStreamStoreFromPublisher = ( }, }); -const mergeStreamAndActiveStore = ( +export const mergeStreamAndActiveStore = ( streamStore: T, activeStreamStore: ResumableStreamActiveStore, ): T & ResumableStreamActiveStore => ({ diff --git a/packages/resumable-streams/src/valkey-store.ts b/packages/resumable-streams/src/valkey-store.ts new file mode 100644 index 000000000..ba1be6d2e --- /dev/null +++ b/packages/resumable-streams/src/valkey-store.ts @@ -0,0 +1,182 @@ +import type { GlideClient, GlideClusterClient, TimeUnit } from "@valkey/valkey-glide"; +import { + buildActiveStreamKey, + createActiveStreamStoreFromPublisher, + markResumableStreamStoreType, + mergeStreamAndActiveStore, +} from "./resumable-streams"; +import type { ResumableStreamActiveStore, ResumableStreamStore } from "./types"; + +const DEFAULT_KEY_PREFIX = "resumable-stream"; + +export interface ValkeyConnectionConfig { + addresses: Array<{ host: string; port: number }>; + useTLS?: boolean; + requestTimeout?: number; + clientName?: string; + [key: string]: unknown; +} + +export interface ResumableStreamValkeyStoreOptions { + client: GlideClient | GlideClusterClient; + clientConfig: ValkeyConnectionConfig; + keyPrefix?: string; + // Applied to active stream keys only; stream data keys are managed by resumable-stream/generic + ttlSeconds?: number; + /** + * Maximum number of concurrent subscription channels. Each subscription creates a + * dedicated GlideClient TCP connection (required by the Glide pub/sub model), so this + * also caps the number of open connections. Defaults to 1000. + */ + maxSubscriptions?: number; + waitUntil?: ((promise: Promise) => void) | null; +} + +export type ValkeyResumableStreamStore = ResumableStreamStore & + ResumableStreamActiveStore & { + close(): Promise; + }; + +export async function createResumableStreamValkeyStore( + options: ResumableStreamValkeyStoreOptions, +): Promise { + let GlideClientClass: typeof GlideClient; + let GlideClientConfigurationClass: { PubSubChannelModes: { Exact: number } }; + let timeUnit: typeof TimeUnit; + + try { + const mod = await import("@valkey/valkey-glide"); + GlideClientClass = mod.GlideClient; + // PubSubChannelModes isn't exported as a value type; cast to access the enum directly. + const configClass = mod.GlideClientConfiguration as unknown as Record; + const pubSubModes = (configClass as { PubSubChannelModes?: { Exact?: number } }) + ?.PubSubChannelModes; + if (pubSubModes?.Exact === undefined) { + throw new Error( + "GlideClientConfiguration.PubSubChannelModes.Exact is not available. " + + "The installed version of @valkey/valkey-glide may be incompatible.", + ); + } + GlideClientConfigurationClass = configClass as { PubSubChannelModes: { Exact: number } }; + timeUnit = mod.TimeUnit; + } catch (err) { + if (err instanceof Error && err.message.includes("PubSubChannelModes")) { + throw err; + } + throw new Error( + "@valkey/valkey-glide is required for createResumableStreamValkeyStore. " + + "Install it with: pnpm add @valkey/valkey-glide", + ); + } + + const keyPrefix = options.keyPrefix ?? DEFAULT_KEY_PREFIX; + const { client, clientConfig } = options; + + // Publisher adapter + const publisher = { + async connect() {}, + async publish(channel: string, message: string) { + return client.publish(channel, message); + }, + async set(key: string, value: string, setOptions?: { EX?: number }) { + if (setOptions?.EX !== undefined) { + return client.set(key, value, { + expiry: { type: timeUnit.Seconds, count: setOptions.EX }, + }); + } + return client.set(key, value); + }, + async get(key: string): Promise { + const result = await client.get(key); + return result !== null ? String(result) : null; + }, + async incr(key: string) { + return client.incr(key); + }, + async del(key: string) { + return client.del([key]); + }, + }; + + // Subscriber adapter — one dedicated GlideClient per channel (Glide pub/sub requirement). + const maxSubscriptions = options.maxSubscriptions ?? 1000; + const subscriptionClients = new Map(); + + const subscriber = { + async connect() {}, + async subscribe(channel: string, callback: (message: string) => void) { + // Close any existing client for this channel to avoid resource leaks on duplicate calls + const existing = subscriptionClients.get(channel); + if (existing) { + existing.close(); + subscriptionClients.delete(channel); + } + + if (subscriptionClients.size >= maxSubscriptions) { + throw new Error( + `Maximum subscription limit (${maxSubscriptions}) reached. Unsubscribe from existing channels before subscribing to new ones.`, + ); + } + + const subClient = await GlideClientClass.createClient({ + ...clientConfig, + pubsubSubscriptions: { + channelsAndPatterns: { + [GlideClientConfigurationClass.PubSubChannelModes.Exact]: new Set([channel]), + }, + callback: (msg: { message: unknown }, _ctx: unknown) => callback(msg.message as string), + }, + }); + subscriptionClients.set(channel, subClient); + }, + async unsubscribe(channel: string) { + const subClient = subscriptionClients.get(channel); + if (subClient) { + subClient.close(); + subscriptionClients.delete(channel); + } + }, + }; + + const { createResumableStreamContext } = await import("resumable-stream/generic"); + + const streamStore = createResumableStreamContext({ + keyPrefix, + waitUntil: options.waitUntil ?? null, + publisher, + subscriber, + }) as ResumableStreamStore; + + const activeStreamStore = createActiveStreamStoreFromPublisher(publisher, keyPrefix); + + // Wire ttlSeconds into setActiveStreamId so active stream keys expire + const ttlSeconds = options.ttlSeconds; + const ttlActiveStreamStore = + ttlSeconds !== undefined + ? { + ...activeStreamStore, + async setActiveStreamId( + context: Parameters[0], + streamId: string, + ) { + const key = buildActiveStreamKey(keyPrefix, context); + await publisher.set(key, streamId, { EX: ttlSeconds }); + }, + } + : activeStreamStore; + + const mergedStore = mergeStreamAndActiveStore(streamStore, ttlActiveStreamStore); + const taggedStore = markResumableStreamStoreType(mergedStore, "valkey", "Valkey"); + + return { + ...taggedStore, + async close() { + const closePromises: Promise[] = []; + for (const subClient of subscriptionClients.values()) { + closePromises.push(Promise.resolve(subClient.close())); + } + subscriptionClients.clear(); + await Promise.all(closePromises); + }, + }; +} From 75d003c3f716bd9269ba28cd8c053f29c8d66b38 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Thu, 30 Apr 2026 08:23:25 -0500 Subject: [PATCH 05/22] test(resumable-streams): add comprehensive tests for ValkeyStore - Add test suite for ValkeyResumableStreamStore with publisher and subscriber adapters - Test publisher operations: set with/without expiry, get with Buffer conversion, incr, publish, del - Test subscriber operations: subscribe with pubsub config, callback invocation, unsubscribe cleanup - Test factory method returns all required store methods and applies TTL configuration - Add vitest configuration for resumable-streams package - Update tsconfig.json to support test environment - Ensure proper mocking of @valkey/valkey-glide and resumable-stream/generic modules Signed-off-by: Riley Des --- .../src/valkey-store.spec.ts | 360 ++++++++++++++++++ packages/resumable-streams/tsconfig.json | 6 +- packages/resumable-streams/vitest.config.ts | 17 + 3 files changed, 380 insertions(+), 3 deletions(-) create mode 100644 packages/resumable-streams/src/valkey-store.spec.ts create mode 100644 packages/resumable-streams/vitest.config.ts diff --git a/packages/resumable-streams/src/valkey-store.spec.ts b/packages/resumable-streams/src/valkey-store.spec.ts new file mode 100644 index 000000000..17d38b4ff --- /dev/null +++ b/packages/resumable-streams/src/valkey-store.spec.ts @@ -0,0 +1,360 @@ +import { createResumableStreamValkeyStore } from "./valkey-store"; + +vi.mock("@valkey/valkey-glide", () => ({ + GlideClient: { createClient: vi.fn() }, + GlideClientConfiguration: { PubSubChannelModes: { Exact: 0 } }, + TimeUnit: { Seconds: "EX" }, +})); + +vi.mock("resumable-stream/generic", () => ({ + createResumableStreamContext: vi.fn().mockReturnValue({ + createNewResumableStream: vi.fn(), + resumeExistingStream: vi.fn(), + }), +})); + +function makeGlideClient() { + return { + get: vi.fn(), + set: vi.fn(), + incr: vi.fn(), + publish: vi.fn(), + del: vi.fn(), + }; +} + +function makeOptions(clientOverrides = {}) { + return { + client: { ...makeGlideClient(), ...clientOverrides }, + clientConfig: { addresses: [{ host: "localhost", port: 6379 }] }, + }; +} + +async function getPublisher(clientOverrides = {}) { + const { createResumableStreamContext } = await import("resumable-stream/generic"); + const mockCtx = vi.mocked(createResumableStreamContext); + mockCtx.mockClear(); + + const opts = makeOptions(clientOverrides); + await createResumableStreamValkeyStore(opts as any); + + const callArgs = mockCtx.mock.calls[0][0] as any; + return { publisher: callArgs.publisher, subscriber: callArgs.subscriber, client: opts.client }; +} + +describe("ValkeyResumableStreamStore — Publisher adapter", () => { + it("set with { EX: 60 } calls client.set with expiry", async () => { + const { publisher, client } = await getPublisher(); + (client as any).set.mockResolvedValue("OK"); + + await publisher.set("my-key", "my-value", { EX: 60 }); + + expect((client as any).set).toHaveBeenCalledWith("my-key", "my-value", { + expiry: { type: "EX", count: 60 }, + }); + }); + + it("set without EX calls client.set with just key and value", async () => { + const { publisher, client } = await getPublisher(); + (client as any).set.mockResolvedValue("OK"); + + await publisher.set("my-key", "my-value"); + + expect((client as any).set).toHaveBeenCalledWith("my-key", "my-value"); + expect((client as any).set).toHaveBeenCalledTimes(1); + expect((client as any).set.mock.calls[0]).toHaveLength(2); + }); + + it("get converts GlideString (Buffer) to string", async () => { + const { publisher, client } = await getPublisher(); + (client as any).get.mockResolvedValue(Buffer.from("hello")); + + const result = await publisher.get("some-key"); + + expect(result).toBe("hello"); + }); + + it("get returns null when client returns null", async () => { + const { publisher, client } = await getPublisher(); + (client as any).get.mockResolvedValue(null); + + const result = await publisher.get("missing-key"); + + expect(result).toBeNull(); + }); + + it("incr delegates to client.incr", async () => { + const { publisher, client } = await getPublisher(); + (client as any).incr.mockResolvedValue(5); + + const result = await publisher.incr("counter-key"); + + expect((client as any).incr).toHaveBeenCalledWith("counter-key"); + expect(result).toBe(5); + }); + + it("publish delegates to client.publish", async () => { + const { publisher, client } = await getPublisher(); + (client as any).publish.mockResolvedValue(1); + + const result = await publisher.publish("my-channel", "my-message"); + + expect((client as any).publish).toHaveBeenCalledWith("my-channel", "my-message"); + expect(result).toBe(1); + }); + + it("del calls client.del([key])", async () => { + const { publisher, client } = await getPublisher(); + (client as any).del.mockResolvedValue(1); + + await publisher.del("some-key"); + + expect((client as any).del).toHaveBeenCalledWith(["some-key"]); + }); +}); + +describe("ValkeyResumableStreamStore — Subscriber adapter", () => { + it("subscribe calls GlideClient.createClient with correct pubsubSubscriptions config", async () => { + const { GlideClient } = await import("@valkey/valkey-glide"); + const mockCreateClient = vi.mocked(GlideClient.createClient); + const mockSubClient = { close: vi.fn() }; + mockCreateClient.mockResolvedValue(mockSubClient as any); + + const { subscriber } = await getPublisher(); + const callback = vi.fn(); + + await subscriber.subscribe("test-channel", callback); + + expect(mockCreateClient).toHaveBeenCalledWith( + expect.objectContaining({ + addresses: [{ host: "localhost", port: 6379 }], + pubsubSubscriptions: expect.objectContaining({ + channelsAndPatterns: expect.objectContaining({ + 0: expect.any(Set), + }), + callback: expect.any(Function), + }), + }), + ); + + const callArg = mockCreateClient.mock.calls[0][0] as any; + expect(callArg.pubsubSubscriptions.channelsAndPatterns[0].has("test-channel")).toBe(true); + }); + + it("subscribe callback invokes the provided callback with msg.message", async () => { + const { GlideClient } = await import("@valkey/valkey-glide"); + const mockCreateClient = vi.mocked(GlideClient.createClient); + const mockSubClient = { close: vi.fn() }; + mockCreateClient.mockClear(); + mockCreateClient.mockResolvedValue(mockSubClient as any); + + const { subscriber } = await getPublisher(); + const callback = vi.fn(); + + mockCreateClient.mockClear(); + await subscriber.subscribe("test-channel", callback); + + const callArg = mockCreateClient.mock.calls[0][0] as any; + callArg.pubsubSubscriptions.callback({ message: "hello-world" }, null); + + expect(callback).toHaveBeenCalledWith("hello-world"); + }); + + it("unsubscribe calls close() on the subscription client and removes it", async () => { + const { GlideClient } = await import("@valkey/valkey-glide"); + const mockCreateClient = vi.mocked(GlideClient.createClient); + const mockSubClient = { close: vi.fn() }; + mockCreateClient.mockResolvedValue(mockSubClient as any); + + const { subscriber } = await getPublisher(); + await subscriber.subscribe("test-channel", vi.fn()); + await subscriber.unsubscribe("test-channel"); + + expect(mockSubClient.close).toHaveBeenCalledTimes(1); + + // Unsubscribing again should not throw (client already removed) + await expect(subscriber.unsubscribe("test-channel")).resolves.not.toThrow(); + expect(mockSubClient.close).toHaveBeenCalledTimes(1); + }); +}); + +describe("ValkeyResumableStreamStore — factory", () => { + it("returns object with all required store methods", async () => { + const opts = makeOptions(); + const store = await createResumableStreamValkeyStore(opts as any); + + expect(typeof store.createNewResumableStream).toBe("function"); + expect(typeof store.resumeExistingStream).toBe("function"); + expect(typeof store.getActiveStreamId).toBe("function"); + expect(typeof store.setActiveStreamId).toBe("function"); + expect(typeof store.clearActiveStream).toBe("function"); + expect(typeof store.close).toBe("function"); + }); + + it("setActiveStreamId applies ttlSeconds as EX when configured", async () => { + const client = makeGlideClient(); + client.set.mockResolvedValue("OK"); + client.get.mockResolvedValue(null); + + const opts = { + ...makeOptions(), + client, + ttlSeconds: 600, + }; + const store = await createResumableStreamValkeyStore(opts as any); + + await store.setActiveStreamId( + { conversationId: "conv-1", userId: "user-1" } as any, + "stream-42", + ); + + expect(client.set).toHaveBeenCalledWith("resumable-stream:active:user-1-conv-1", "stream-42", { + expiry: { type: "EX", count: 600 }, + }); + }); + + it("setActiveStreamId does NOT apply EX when ttlSeconds is not configured", async () => { + const client = makeGlideClient(); + client.set.mockResolvedValue("OK"); + client.get.mockResolvedValue(null); + + const opts = { + ...makeOptions(), + client, + }; + const store = await createResumableStreamValkeyStore(opts as any); + + await store.setActiveStreamId( + { conversationId: "conv-1", userId: "user-1" } as any, + "stream-99", + ); + + const setCalls = client.set.mock.calls; + const activeSetCall = setCalls.find( + (c: any[]) => typeof c[0] === "string" && c[0].includes("active:"), + ); + expect(activeSetCall).toBeDefined(); + expect(activeSetCall).toHaveLength(2); + }); + + it("getActiveStreamId returns stored stream ID", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue("stream-123"); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + const result = await store.getActiveStreamId({ + conversationId: "conv-1", + userId: "user-1", + } as any); + + expect(result).toBe("stream-123"); + expect(client.get).toHaveBeenCalledWith("resumable-stream:active:user-1-conv-1"); + }); + + it("getActiveStreamId returns null when no active stream exists", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue(null); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + const result = await store.getActiveStreamId({ + conversationId: "conv-1", + userId: "user-1", + } as any); + + expect(result).toBeNull(); + }); + + it("getActiveStreamId returns null when stored value is empty string", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue(""); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + const result = await store.getActiveStreamId({ + conversationId: "conv-1", + userId: "user-1", + } as any); + + expect(result).toBeNull(); + }); + + it("clearActiveStream deletes key when streamId matches current value", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue("stream-42"); + client.del.mockResolvedValue(1); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + await store.clearActiveStream({ + conversationId: "conv-1", + userId: "user-1", + streamId: "stream-42", + } as any); + + expect(client.del).toHaveBeenCalledWith(["resumable-stream:active:user-1-conv-1"]); + }); + + it("clearActiveStream does NOT delete key when streamId does not match", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue("stream-other"); + client.del.mockResolvedValue(1); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + await store.clearActiveStream({ + conversationId: "conv-1", + userId: "user-1", + streamId: "stream-42", + } as any); + + expect(client.del).not.toHaveBeenCalled(); + }); + + it("store is tagged with valkey type marker", async () => { + const opts = makeOptions(); + const store = (await createResumableStreamValkeyStore(opts as any)) as any; + + expect(store.__voltagentResumableStoreType).toBe("valkey"); + }); + + it("close() closes all subscription clients and clears the map", async () => { + const { GlideClient } = await import("@valkey/valkey-glide"); + const mockCreateClient = vi.mocked(GlideClient.createClient); + const mockSubClient1 = { close: vi.fn() }; + const mockSubClient2 = { close: vi.fn() }; + mockCreateClient + .mockResolvedValueOnce(mockSubClient1 as any) + .mockResolvedValueOnce(mockSubClient2 as any); + + const opts = makeOptions(); + const store = await createResumableStreamValkeyStore(opts as any); + + const { createResumableStreamContext } = await import("resumable-stream/generic"); + const mockCtx = vi.mocked(createResumableStreamContext); + const callArgs = mockCtx.mock.calls[mockCtx.mock.calls.length - 1][0] as any; + await callArgs.subscriber.subscribe("channel-1", vi.fn()); + await callArgs.subscriber.subscribe("channel-2", vi.fn()); + + await store.close(); + + expect(mockSubClient1.close).toHaveBeenCalledTimes(1); + expect(mockSubClient2.close).toHaveBeenCalledTimes(1); + + // Calling close() again should be safe (map is cleared) + await store.close(); + expect(mockSubClient1.close).toHaveBeenCalledTimes(1); + expect(mockSubClient2.close).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/resumable-streams/tsconfig.json b/packages/resumable-streams/tsconfig.json index d4612442c..a8814988a 100644 --- a/packages/resumable-streams/tsconfig.json +++ b/packages/resumable-streams/tsconfig.json @@ -23,8 +23,8 @@ "esModuleInterop": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true, - "types": ["node"] + "types": ["vitest/globals", "node"] }, - "include": ["src/**/*.ts"], - "exclude": ["node_modules", "dist"] + "include": ["src/**/*.ts", "src/**/*.spec-d.ts"], + "exclude": ["node_modules", "dist", "src/**/*.spec.ts"] } diff --git a/packages/resumable-streams/vitest.config.ts b/packages/resumable-streams/vitest.config.ts new file mode 100644 index 000000000..236b2f5eb --- /dev/null +++ b/packages/resumable-streams/vitest.config.ts @@ -0,0 +1,17 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["**/*.spec.ts"], + environment: "node", + coverage: { + provider: "v8", + reporter: ["text", "json", "html"], + include: ["src/**/*.ts"], + exclude: ["src/**/*.d.ts", "src/**/index.ts"], + }, + globals: true, + testTimeout: 10000, + hookTimeout: 10000, + }, +}); From 70c021a806618c6c92be94edae1126faec8d4672 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Thu, 30 Apr 2026 08:37:26 -0500 Subject: [PATCH 06/22] feat(examples): add Valkey store example with A2A and resumable streams - Add complete example demonstrating ValkeyTaskStore and ValkeyResumableStreamStore integration - Include SupportAgent implementation backed by Valkey for distributed persistence - Add environment configuration template with Valkey connection defaults - Include comprehensive README with setup instructions, configuration options, and usage examples - Add changeset documenting new Valkey store providers in a2a-server and resumable-streams packages - Demonstrate both standalone and cluster Valkey deployments with optional TTL-based expiration Signed-off-by: Riley Des --- .changeset/add-valkey-store-providers.md | 12 ++ examples/with-valkey-store/.env.example | 5 + examples/with-valkey-store/.gitignore | 3 + examples/with-valkey-store/README.md | 170 ++++++++++++++++++ examples/with-valkey-store/package.json | 32 ++++ .../with-valkey-store/src/agents/assistant.ts | 21 +++ examples/with-valkey-store/src/index.ts | 67 +++++++ examples/with-valkey-store/tsconfig.json | 14 ++ pnpm-lock.yaml | 51 +++++- 9 files changed, 374 insertions(+), 1 deletion(-) create mode 100644 .changeset/add-valkey-store-providers.md create mode 100644 examples/with-valkey-store/.env.example create mode 100644 examples/with-valkey-store/.gitignore create mode 100644 examples/with-valkey-store/README.md create mode 100644 examples/with-valkey-store/package.json create mode 100644 examples/with-valkey-store/src/agents/assistant.ts create mode 100644 examples/with-valkey-store/src/index.ts create mode 100644 examples/with-valkey-store/tsconfig.json diff --git a/.changeset/add-valkey-store-providers.md b/.changeset/add-valkey-store-providers.md new file mode 100644 index 000000000..6c9a26c88 --- /dev/null +++ b/.changeset/add-valkey-store-providers.md @@ -0,0 +1,12 @@ +--- +"@voltagent/a2a-server": minor +"@voltagent/resumable-streams": minor +--- + +feat: add Valkey-backed TaskStore and ResumableStreamStore providers + +Adds `ValkeyTaskStore` to `@voltagent/a2a-server` and `createResumableStreamValkeyStore` to +`@voltagent/resumable-streams`, enabling distributed persistence via the `@valkey/valkey-glide` +client library. Both stores support configurable key prefixes, optional TTL-based expiration, and +standalone or cluster Valkey deployments. The `@valkey/valkey-glide` peer dependency is optional so +consumers who don't use Valkey are unaffected. diff --git a/examples/with-valkey-store/.env.example b/examples/with-valkey-store/.env.example new file mode 100644 index 000000000..6bd9978be --- /dev/null +++ b/examples/with-valkey-store/.env.example @@ -0,0 +1,5 @@ +OPENAI_API_KEY= + +# Valkey connection (defaults shown) +VALKEY_HOST=localhost +VALKEY_PORT=6379 diff --git a/examples/with-valkey-store/.gitignore b/examples/with-valkey-store/.gitignore new file mode 100644 index 000000000..9c97bbd46 --- /dev/null +++ b/examples/with-valkey-store/.gitignore @@ -0,0 +1,3 @@ +node_modules +dist +.env diff --git a/examples/with-valkey-store/README.md b/examples/with-valkey-store/README.md new file mode 100644 index 000000000..063428b59 --- /dev/null +++ b/examples/with-valkey-store/README.md @@ -0,0 +1,170 @@ +
+ +VoltAgent banner + + +
+
+ + +
+ +
+ +
+ VoltAgent is an open source TypeScript framework for building and orchestrating AI agents.
+ Escape the limitations of no-code builders and the complexity of starting from scratch. +
+
+
+ +
+ +[![npm version](https://img.shields.io/npm/v/@voltagent/core.svg)](https://www.npmjs.com/package/@voltagent/core) +[![Contributor Covenant](https://img.shields.io/badge/Contributor%20Covenant-2.0-4baaaa.svg)](../../CODE_OF_CONDUCT.md) +[![Discord](https://img.shields.io/discord/1361559153780195478.svg?label=&logo=discord&logoColor=ffffff&color=7389D8&labelColor=6A7EC2)](https://s.voltagent.dev/discord) +[![Twitter Follow](https://img.shields.io/twitter/follow/voltagent_dev?style=social)](https://twitter.com/voltagent_dev) + +
+ +
+ +# VoltAgent with Valkey Store Example + +This example demonstrates how to use **Valkey** as a distributed backing store for both A2A task persistence and resumable streaming in VoltAgent. It uses the `@valkey/valkey-glide` client library for high-performance access to Valkey (standalone or cluster). + +## What you get + +- **ValkeyTaskStore** — Persists A2A task records to Valkey with configurable key prefixes and TTL-based expiration. +- **ValkeyResumableStreamStore** — Manages resumable streaming sessions via Valkey pub/sub and key-value operations. +- A minimal VoltAgent project with a `SupportAgent` exposed over the A2A protocol, backed entirely by Valkey. + +## Structure + +``` +examples/with-valkey-store +├── src/ +│ ├── agents/assistant.ts # Example agent definition +│ └── index.ts # VoltAgent bootstrap with Valkey stores +├── .env.example # Environment variable template +├── package.json +├── tsconfig.json +└── README.md +``` + +## Prerequisites + +- Node.js 20+ +- `pnpm` +- A running Valkey instance (or Redis-compatible server) +- `OPENAI_API_KEY` in your environment + +### Start Valkey locally with Docker + +```bash +docker run -d --name valkey -p 6379:6379 valkey/valkey:8 +``` + +## Run locally + +1. Copy the environment template and fill in your keys: + +```bash +cp .env.example .env +``` + +2. Install dependencies and start the dev server: + +```bash +pnpm install +pnpm --filter voltagent-example-with-valkey-store dev +``` + +The server listens on `http://localhost:3141`. + +## Configuration + +Environment variables: + +| Variable | Default | Description | +| ---------------- | ----------- | ------------------------------------ | +| `OPENAI_API_KEY` | — | OpenAI API key for the example agent | +| `VALKEY_HOST` | `localhost` | Valkey server hostname | +| `VALKEY_PORT` | `6379` | Valkey server port | + +### Key prefixes and TTL + +Both stores accept `keyPrefix` and `ttlSeconds` options: + +```typescript +// Task store — keys like "my-tasks:agentId::taskId" +const taskStore = await createValkeyTaskStore({ + client: valkeyClient, + keyPrefix: "my-tasks", + ttlSeconds: 3600, +}); + +// Stream store — keys like "my-streams:active:userId-conversationId" +const streamStore = await createResumableStreamValkeyStore({ + client: valkeyClient, + clientConfig: { addresses: [{ host: "localhost", port: 6379 }] }, + keyPrefix: "my-streams", + ttlSeconds: 600, +}); +``` + +### Cluster mode + +Both stores accept `GlideClient` or `GlideClusterClient`: + +```typescript +import { GlideClusterClient } from "@valkey/valkey-glide"; + +const clusterClient = await GlideClusterClient.createClient({ + addresses: [ + { host: "node1.example.com", port: 6379 }, + { host: "node2.example.com", port: 6379 }, + ], + useTLS: true, +}); + +const taskStore = new ValkeyTaskStore({ client: clusterClient }); +``` + +## Try it + +```bash +# Fetch the agent card +curl http://localhost:3141/.well-known/supportagent/agent-card.json | jq + +# Send a message +curl -X POST http://localhost:3141/a2a/supportagent \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": "1", + "method": "message/send", + "params": { + "message": { + "kind": "message", + "role": "user", + "messageId": "msg-1", + "parts": [{ "kind": "text", "text": "What time is it?" }] + } + } + }' +``` + +## Next steps + +- Adjust `ttlSeconds` to match your retention requirements (or omit it for no expiration). +- Use `GlideClusterClient` for production Valkey cluster deployments (e.g., AWS ElastiCache Valkey). +- Add TLS by setting `useTLS: true` in the client configuration. + +Happy hacking! 🚀 diff --git a/examples/with-valkey-store/package.json b/examples/with-valkey-store/package.json new file mode 100644 index 000000000..0b8cd1ecd --- /dev/null +++ b/examples/with-valkey-store/package.json @@ -0,0 +1,32 @@ +{ + "name": "voltagent-example-with-valkey-store", + "version": "0.0.0", + "dependencies": { + "@valkey/valkey-glide": "^2.3.1", + "@voltagent/a2a-server": "^2.0.3", + "@voltagent/core": "^2.7.2", + "@voltagent/internal": "^1.0.3", + "@voltagent/logger": "^2.0.2", + "@voltagent/resumable-streams": "^2.0.2", + "@voltagent/server-hono": "^2.0.12", + "ai": "^6.0.0", + "zod": "^3.25.76" + }, + "devDependencies": { + "@types/node": "^24.2.1", + "tsx": "^4.21.0", + "typescript": "^5.8.2" + }, + "private": true, + "repository": { + "type": "git", + "url": "https://github.com/VoltAgent/voltagent.git", + "directory": "examples/with-valkey-store" + }, + "scripts": { + "build": "tsc", + "dev": "tsx watch --env-file=.env ./src", + "start": "node dist/index.js" + }, + "type": "module" +} diff --git a/examples/with-valkey-store/src/agents/assistant.ts b/examples/with-valkey-store/src/agents/assistant.ts new file mode 100644 index 000000000..95196b2ea --- /dev/null +++ b/examples/with-valkey-store/src/agents/assistant.ts @@ -0,0 +1,21 @@ +import { Agent, createTool } from "@voltagent/core"; +import { z } from "zod"; + +const statusTool = createTool({ + name: "status", + description: "Return the current time in ISO format", + parameters: z.object({}), + async execute() { + return { + timestamp: new Date().toISOString(), + }; + }, +}); + +export const assistant = new Agent({ + id: "supportagent", + name: "SupportAgent", + instructions: "Reply with helpful answers and include the current time when relevant.", + model: "openai/gpt-4o-mini", + tools: [statusTool], +}); diff --git a/examples/with-valkey-store/src/index.ts b/examples/with-valkey-store/src/index.ts new file mode 100644 index 000000000..63dbc49cc --- /dev/null +++ b/examples/with-valkey-store/src/index.ts @@ -0,0 +1,67 @@ +import { GlideClient } from "@valkey/valkey-glide"; +import { A2AServer, createValkeyTaskStore } from "@voltagent/a2a-server"; +import { VoltAgent } from "@voltagent/core"; +import { createPinoLogger } from "@voltagent/logger"; +import { + createResumableStreamAdapter, + createResumableStreamValkeyStore, +} from "@voltagent/resumable-streams"; +import { honoServer } from "@voltagent/server-hono"; +import { assistant } from "./agents/assistant"; + +const logger = createPinoLogger({ + name: "with-valkey-store", + level: "debug", +}); + +const host = process.env.VALKEY_HOST ?? "localhost"; +const port = Number(process.env.VALKEY_PORT ?? 6379); + +async function main() { + const valkeyClient = await GlideClient.createClient({ + addresses: [{ host, port }], + }); + logger.info(`Connected to Valkey at ${host}:${port}`); + + const taskStore = await createValkeyTaskStore({ + client: valkeyClient, + keyPrefix: "example-tasks", + ttlSeconds: 3600, + }); + + const streamStore = await createResumableStreamValkeyStore({ + client: valkeyClient, + clientConfig: { addresses: [{ host, port }] }, + keyPrefix: "example-streams", + ttlSeconds: 600, + }); + + const streamAdapter = await createResumableStreamAdapter({ + streamStore, + }); + + const a2aServerFactory = () => + new A2AServer({ + name: "SupportAgent", + version: "0.1.0", + description: "A2A server with Valkey-backed task and stream persistence", + taskStore, + }); + + new VoltAgent({ + agents: { assistant }, + a2aServers: { supportAgent: a2aServerFactory }, + server: honoServer({ + port: 3141, + resumableStream: { adapter: streamAdapter }, + }), + logger, + }); + + logger.info("VoltAgent with Valkey stores running on http://localhost:3141"); +} + +main().catch((err) => { + logger.error("Failed to start", { error: err }); + process.exit(1); +}); diff --git a/examples/with-valkey-store/tsconfig.json b/examples/with-valkey-store/tsconfig.json new file mode 100644 index 000000000..e961e4b7f --- /dev/null +++ b/examples/with-valkey-store/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist", + "module": "ESNext", + "moduleResolution": "Node", + "target": "ES2022", + "types": ["node"], + "esModuleInterop": true + }, + "include": ["src/**/*"], + "exclude": ["dist"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8854f4485..6ede9d5f4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2980,6 +2980,46 @@ importers: specifier: ^5.8.2 version: 5.9.2 + examples/with-valkey-store: + dependencies: + '@valkey/valkey-glide': + specifier: ^2.3.1 + version: 2.3.1 + '@voltagent/a2a-server': + specifier: ^2.0.3 + version: link:../../packages/a2a-server + '@voltagent/core': + specifier: ^2.7.2 + version: link:../../packages/core + '@voltagent/internal': + specifier: ^1.0.3 + version: link:../../packages/internal + '@voltagent/logger': + specifier: ^2.0.2 + version: link:../../packages/logger + '@voltagent/resumable-streams': + specifier: ^2.0.2 + version: link:../../packages/resumable-streams + '@voltagent/server-hono': + specifier: ^2.0.12 + version: link:../../packages/server-hono + ai: + specifier: ^6.0.0 + version: 6.0.3(zod@3.25.76) + zod: + specifier: ^3.25.76 + version: 3.25.76 + devDependencies: + '@types/node': + specifier: ^24.2.1 + version: 24.6.2 + tsx: + specifier: ^4.21.0 + version: 4.21.0 + typescript: + specifier: ^5.8.2 + version: 5.9.3 + examples/with-vector-search: dependencies: '@ai-sdk/openai': @@ -3636,6 +3676,9 @@ importers: '@a2a-js/sdk': specifier: ^0.2.5 version: 0.2.5 + '@valkey/valkey-glide': + specifier: '>=2.3.1' + version: 2.3.1 '@voltagent/internal': specifier: ^1.0.2 version: link:../internal @@ -3646,6 +3689,9 @@ importers: '@voltagent/core': specifier: ^2.0.2 version: link:../core + fast-check: + specifier: ^3.23.2 + version: 3.23.2 packages/ag-ui: dependencies: @@ -4223,6 +4269,9 @@ importers: packages/resumable-streams: dependencies: + '@valkey/valkey-glide': + specifier: '>=2.3.1' + version: 2.3.1 '@voltagent/core': specifier: ^2.6.6 version: link:../core @@ -21358,7 +21407,7 @@ packages: '@babel/core': 7.28.5 '@babel/plugin-syntax-typescript': 7.27.1(@babel/core@7.28.5) '@babel/plugin-transform-typescript': 7.28.0(@babel/core@7.28.5) - '@rolldown/pluginutils': 1.0.0-rc.9 + '@rolldown/pluginutils': 1.0.0-rc.17 '@vue/babel-plugin-jsx': 1.5.0(@babel/core@7.28.5) vite: 7.2.7(@types/node@24.2.1)(jiti@2.6.1) vue: 3.5.22(typescript@5.9.3) From cbee14595a41820b9f823664ae90242092eac1dc Mon Sep 17 00:00:00 2001 From: Riley Des Date: Thu, 30 Apr 2026 14:00:51 -0500 Subject: [PATCH 07/22] fix(a2a-server): address review findings for ValkeyTaskStore - Add ttlSeconds validation guard in createValkeyTaskStore - Add debug log when config.taskStore overrides deps.taskStore - Add class-level JSDoc documenting activeCancellations process-local limitation - Mark timeUnitSeconds constructor param as @internal" 2>&1 Signed-off-by: Riley Des --- packages/a2a-server/src/server.ts | 7 +++++++ packages/a2a-server/src/valkey-store.ts | 18 +++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/packages/a2a-server/src/server.ts b/packages/a2a-server/src/server.ts index 4937ef68f..02e166bf2 100644 --- a/packages/a2a-server/src/server.ts +++ b/packages/a2a-server/src/server.ts @@ -84,6 +84,13 @@ export class A2AServer { * the one supplied here in `deps`. */ initialize(deps: A2AServerDeps): void { + if (this.config.taskStore && deps.taskStore) { + console.debug( + "[A2AServer] config.taskStore is overriding deps.taskStore. " + + "The task store provided in A2AServerConfig takes precedence.", + ); + } + this.deps = { ...deps, taskStore: this.config.taskStore ?? deps.taskStore ?? new InMemoryTaskStore(), diff --git a/packages/a2a-server/src/valkey-store.ts b/packages/a2a-server/src/valkey-store.ts index 86999afeb..cfe9bf6f2 100644 --- a/packages/a2a-server/src/valkey-store.ts +++ b/packages/a2a-server/src/valkey-store.ts @@ -14,6 +14,13 @@ const VALKEY_GLIDE_REQUIRED = export async function createValkeyTaskStore( options: ValkeyTaskStoreOptions, ): Promise { + if ( + options.ttlSeconds !== undefined && + (!Number.isFinite(options.ttlSeconds) || options.ttlSeconds <= 0) + ) { + throw new Error("ttlSeconds must be a positive finite number"); + } + let timeUnitSeconds: TimeUnit | undefined; if (options.ttlSeconds !== undefined) { try { @@ -26,16 +33,25 @@ export async function createValkeyTaskStore( return new ValkeyTaskStore(options, timeUnitSeconds); } +/** + * Valkey-backed implementation of {@link TaskStore}. + * + * **Important:** `activeCancellations` is an in-process `Set` and is **not** propagated across + * server instances sharing the same Valkey backend. For cross-instance cancellation, `A2AServer` + * would need to subscribe to a Valkey pub/sub channel for cancellation events instead of relying + * on process-local `AbortController` signaling. + */ export class ValkeyTaskStore implements TaskStore { private readonly client: GlideClient | GlideClusterClient; private readonly keyPrefix: string; private readonly ttlSeconds?: number; + /** @internal */ private timeUnitSeconds?: TimeUnit; // In-process only, not propagated across instances via Valkey. readonly activeCancellations = new Set(); - constructor(options: ValkeyTaskStoreOptions, timeUnitSeconds?: TimeUnit) { + constructor(options: ValkeyTaskStoreOptions, /** @internal */ timeUnitSeconds?: TimeUnit) { this.client = options.client; this.keyPrefix = options.keyPrefix ?? "a2a-tasks"; this.ttlSeconds = options.ttlSeconds; From 6a0d2023cc6608259335cc0b063b2a1ed499b05c Mon Sep 17 00:00:00 2001 From: Riley Des Date: Thu, 30 Apr 2026 14:01:40 -0500 Subject: [PATCH 08/22] fix(resumable-streams): address eview findings for Valkey store - Simplify PubSubChannelModes.Exact cast to single optional-chain access - Await close() in subscribe and unsubscribe to prevent resource leaks - Use String(msg.message) instead of unsafe as-string cast in pub/sub callback - Add ttlSeconds validation guard in createResumableStreamValkeyStore - Document close() ownership semantics (caller retains main client) - Add @internal tags to four newly-exported helper functions Signed-off-by: Riley Des --- .../src/resumable-streams.ts | 4 +++ .../resumable-streams/src/valkey-store.ts | 27 ++++++++++++------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/packages/resumable-streams/src/resumable-streams.ts b/packages/resumable-streams/src/resumable-streams.ts index 97071a7e8..50544b426 100644 --- a/packages/resumable-streams/src/resumable-streams.ts +++ b/packages/resumable-streams/src/resumable-streams.ts @@ -84,6 +84,7 @@ const getResumableStreamDisabledInfo = (value: unknown) => { return { reason, docsUrl }; }; +/** @internal */ export const markResumableStreamStoreType = ( value: T, type: string, @@ -201,6 +202,7 @@ const buildStreamKey = ({ conversationId, userId }: ResumableStreamContext) => { return `${userId}-${conversationId}`; }; +/** @internal */ export const buildActiveStreamKey = (keyPrefix: string, context: ResumableStreamContext) => `${keyPrefix}:active:${buildStreamKey(context)}`; @@ -216,6 +218,7 @@ const buildActiveStreamQuery = (context: ResumableStreamContext, streamId?: stri return params.toString(); }; +/** @internal */ export const createActiveStreamStoreFromPublisher = ( publisher: ResumableStreamPublisher, keyPrefix: string, @@ -250,6 +253,7 @@ export const createActiveStreamStoreFromPublisher = ( }, }); +/** @internal */ export const mergeStreamAndActiveStore = ( streamStore: T, activeStreamStore: ResumableStreamActiveStore, diff --git a/packages/resumable-streams/src/valkey-store.ts b/packages/resumable-streams/src/valkey-store.ts index ba1be6d2e..5366fba6a 100644 --- a/packages/resumable-streams/src/valkey-store.ts +++ b/packages/resumable-streams/src/valkey-store.ts @@ -47,17 +47,15 @@ export async function createResumableStreamValkeyStore( try { const mod = await import("@valkey/valkey-glide"); GlideClientClass = mod.GlideClient; - // PubSubChannelModes isn't exported as a value type; cast to access the enum directly. - const configClass = mod.GlideClientConfiguration as unknown as Record; - const pubSubModes = (configClass as { PubSubChannelModes?: { Exact?: number } }) - ?.PubSubChannelModes; - if (pubSubModes?.Exact === undefined) { + // PubSubChannelModes isn't exported as a value type; use a single guarded access. + const PubSubExact = (mod as any).GlideClientConfiguration?.PubSubChannelModes?.Exact; + if (PubSubExact === undefined) { throw new Error( "GlideClientConfiguration.PubSubChannelModes.Exact is not available. " + "The installed version of @valkey/valkey-glide may be incompatible.", ); } - GlideClientConfigurationClass = configClass as { PubSubChannelModes: { Exact: number } }; + GlideClientConfigurationClass = { PubSubChannelModes: { Exact: PubSubExact } }; timeUnit = mod.TimeUnit; } catch (err) { if (err instanceof Error && err.message.includes("PubSubChannelModes")) { @@ -72,6 +70,13 @@ export async function createResumableStreamValkeyStore( const keyPrefix = options.keyPrefix ?? DEFAULT_KEY_PREFIX; const { client, clientConfig } = options; + if ( + options.ttlSeconds !== undefined && + (!Number.isFinite(options.ttlSeconds) || options.ttlSeconds <= 0) + ) { + throw new Error("ttlSeconds must be a positive finite number"); + } + // Publisher adapter const publisher = { async connect() {}, @@ -108,7 +113,7 @@ export async function createResumableStreamValkeyStore( // Close any existing client for this channel to avoid resource leaks on duplicate calls const existing = subscriptionClients.get(channel); if (existing) { - existing.close(); + await existing.close(); subscriptionClients.delete(channel); } @@ -124,7 +129,7 @@ export async function createResumableStreamValkeyStore( channelsAndPatterns: { [GlideClientConfigurationClass.PubSubChannelModes.Exact]: new Set([channel]), }, - callback: (msg: { message: unknown }, _ctx: unknown) => callback(msg.message as string), + callback: (msg: { message: unknown }, _ctx: unknown) => callback(String(msg.message)), }, }); subscriptionClients.set(channel, subClient); @@ -132,7 +137,7 @@ export async function createResumableStreamValkeyStore( async unsubscribe(channel: string) { const subClient = subscriptionClients.get(channel); if (subClient) { - subClient.close(); + await subClient.close(); subscriptionClients.delete(channel); } }, @@ -170,6 +175,10 @@ export async function createResumableStreamValkeyStore( return { ...taggedStore, + /** + * Closes all internally-created subscription clients. The main `client` passed in + * `options` is **not** closed — the caller retains ownership of its lifecycle. + */ async close() { const closePromises: Promise[] = []; for (const subClient of subscriptionClients.values()) { From d625eba35856e37c0b9d82bf3db4982a63f8853c Mon Sep 17 00:00:00 2001 From: Riley Des Date: Thu, 30 Apr 2026 14:02:42 -0500 Subject: [PATCH 09/22] docs(examples): update changeset and example config for Valkey store PR - Document taskStore config precedence change in changeset description - Update example tsconfig moduleResolution from Node to NodeNext Signed-off-by: Riley Des --- .changeset/add-valkey-store-providers.md | 6 ++++++ examples/with-valkey-store/tsconfig.json | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.changeset/add-valkey-store-providers.md b/.changeset/add-valkey-store-providers.md index 6c9a26c88..3af242e0d 100644 --- a/.changeset/add-valkey-store-providers.md +++ b/.changeset/add-valkey-store-providers.md @@ -10,3 +10,9 @@ Adds `ValkeyTaskStore` to `@voltagent/a2a-server` and `createResumableStreamValk client library. Both stores support configurable key prefixes, optional TTL-based expiration, and standalone or cluster Valkey deployments. The `@valkey/valkey-glide` peer dependency is optional so consumers who don't use Valkey are unaffected. + +**Breaking change in `@voltagent/a2a-server`:** `A2AServerConfig` now accepts an optional +`taskStore` property. When provided, it takes precedence over the `deps.taskStore` argument passed +to `A2AServer.initialize()`. The full precedence chain is: +`config.taskStore` > `deps.taskStore` > `InMemoryTaskStore`. A debug-level log is emitted when +`config.taskStore` overrides a non-null `deps.taskStore`. diff --git a/examples/with-valkey-store/tsconfig.json b/examples/with-valkey-store/tsconfig.json index e961e4b7f..f58e64901 100644 --- a/examples/with-valkey-store/tsconfig.json +++ b/examples/with-valkey-store/tsconfig.json @@ -4,7 +4,7 @@ "rootDir": "src", "outDir": "dist", "module": "ESNext", - "moduleResolution": "Node", + "moduleResolution": "NodeNext", "target": "ES2022", "types": ["node"], "esModuleInterop": true From 66147dbea5c2e7c3ec079b9e272e22fc7ffca772 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 08:05:19 -0500 Subject: [PATCH 10/22] fix(resumable-streams): correct publish arg order and subscription handling - Swap publish(channel, message) to publish(message, channel) to match the GLIDE API signature - Use GlideClusterClient.createClient for subscription clients when the main client is a cluster instance - Add pendingSubscriptions guard to prevent race conditions and subscription limit bypass on concurrent subscribe calls - Simplify close() to call synchronous subClient.close() directly - Update test assertion for corrected publish argument order - Replace invalid .resolves.not.toThrow() matcher with direct await - Add docstrings to all exported interfaces, types, classes, methods, and factory functions Signed-off-by: Riley Des --- .../src/valkey-store.spec.ts | 6 +- .../resumable-streams/src/valkey-store.ts | 96 +++++++++++++++---- 2 files changed, 81 insertions(+), 21 deletions(-) diff --git a/packages/resumable-streams/src/valkey-store.spec.ts b/packages/resumable-streams/src/valkey-store.spec.ts index 17d38b4ff..9c4a8b164 100644 --- a/packages/resumable-streams/src/valkey-store.spec.ts +++ b/packages/resumable-streams/src/valkey-store.spec.ts @@ -99,7 +99,7 @@ describe("ValkeyResumableStreamStore — Publisher adapter", () => { const result = await publisher.publish("my-channel", "my-message"); - expect((client as any).publish).toHaveBeenCalledWith("my-channel", "my-message"); + expect((client as any).publish).toHaveBeenCalledWith("my-message", "my-channel"); expect(result).toBe(1); }); @@ -172,8 +172,8 @@ describe("ValkeyResumableStreamStore — Subscriber adapter", () => { expect(mockSubClient.close).toHaveBeenCalledTimes(1); - // Unsubscribing again should not throw (client already removed) - await expect(subscriber.unsubscribe("test-channel")).resolves.not.toThrow(); + // Unsubscribing again should resolve safely (client already removed) + await subscriber.unsubscribe("test-channel"); expect(mockSubClient.close).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/resumable-streams/src/valkey-store.ts b/packages/resumable-streams/src/valkey-store.ts index 5366fba6a..fb444ffe0 100644 --- a/packages/resumable-streams/src/valkey-store.ts +++ b/packages/resumable-streams/src/valkey-store.ts @@ -9,6 +9,13 @@ import type { ResumableStreamActiveStore, ResumableStreamStore } from "./types"; const DEFAULT_KEY_PREFIX = "resumable-stream"; +/** + * Connection configuration passed to the Valkey GLIDE client. + * + * At minimum, `addresses` must contain one `{ host, port }` entry. Additional + * properties (TLS, timeouts, etc.) are forwarded to the underlying GLIDE + * client constructor. + */ export interface ValkeyConnectionConfig { addresses: Array<{ host: string; port: number }>; useTLS?: boolean; @@ -17,10 +24,18 @@ export interface ValkeyConnectionConfig { [key: string]: unknown; } +/** + * Options for creating a Valkey-backed resumable stream store via + * {@link createResumableStreamValkeyStore}. + */ export interface ResumableStreamValkeyStoreOptions { + /** Valkey client instance (standalone {@link GlideClient} or {@link GlideClusterClient}). */ client: GlideClient | GlideClusterClient; + /** Connection config reused when creating per-channel subscription clients. */ clientConfig: ValkeyConnectionConfig; + /** Key prefix for all Valkey keys managed by this store. Defaults to `"resumable-stream"`. */ keyPrefix?: string; + /** Optional TTL in seconds applied to active-stream keys. Must be a positive finite number. */ // Applied to active stream keys only; stream data keys are managed by resumable-stream/generic ttlSeconds?: number; /** @@ -29,14 +44,33 @@ export interface ResumableStreamValkeyStoreOptions { * also caps the number of open connections. Defaults to 1000. */ maxSubscriptions?: number; + /** Optional callback (e.g. from a serverless runtime) to keep the process alive while background work completes. */ waitUntil?: ((promise: Promise) => void) | null; } +/** + * A resumable stream store backed by Valkey, combining stream creation/resumption + * with active-stream tracking and a {@link close} method for cleanup. + */ export type ValkeyResumableStreamStore = ResumableStreamStore & ResumableStreamActiveStore & { close(): Promise; }; +/** + * Creates a Valkey-backed resumable stream store. + * + * The returned store uses the provided {@link GlideClient} (or + * {@link GlideClusterClient}) for key-value and pub/sub operations required by + * the `resumable-stream/generic` library. Each pub/sub subscription creates a + * dedicated GLIDE client connection (required by the GLIDE pub/sub model). + * + * @param options - Store configuration including the Valkey client, connection + * config, optional key prefix, TTL, and subscription limits. + * @returns A {@link ValkeyResumableStreamStore} ready for use. + * @throws If `@valkey/valkey-glide` is not installed or is an incompatible version. + * @throws If `ttlSeconds` is provided but is not a positive finite number. + */ export async function createResumableStreamValkeyStore( options: ResumableStreamValkeyStoreOptions, ): Promise { @@ -81,7 +115,7 @@ export async function createResumableStreamValkeyStore( const publisher = { async connect() {}, async publish(channel: string, message: string) { - return client.publish(channel, message); + return client.publish(message, channel); }, async set(key: string, value: string, setOptions?: { EX?: number }) { if (setOptions?.EX !== undefined) { @@ -103,9 +137,22 @@ export async function createResumableStreamValkeyStore( }, }; - // Subscriber adapter — one dedicated GlideClient per channel (Glide pub/sub requirement). + // Subscriber adapter — one dedicated client per channel (Glide pub/sub requirement). + // Detect whether the caller provided a cluster client so subscription clients match. + let GlideClusterClientClass: typeof GlideClusterClient | undefined; + try { + const mod = await import("@valkey/valkey-glide"); + GlideClusterClientClass = mod.GlideClusterClient; + } catch { + // Already handled above; GlideClusterClient is only needed for cluster mode. + } + const isClusterMode = + GlideClusterClientClass !== undefined && client instanceof GlideClusterClientClass; + const maxSubscriptions = options.maxSubscriptions ?? 1000; - const subscriptionClients = new Map(); + const subscriptionClients = new Map(); + // Guard against concurrent subscribe calls interleaving across awaits. + const pendingSubscriptions = new Set(); const subscriber = { async connect() {}, @@ -113,31 +160,46 @@ export async function createResumableStreamValkeyStore( // Close any existing client for this channel to avoid resource leaks on duplicate calls const existing = subscriptionClients.get(channel); if (existing) { - await existing.close(); + existing.close(); subscriptionClients.delete(channel); } - if (subscriptionClients.size >= maxSubscriptions) { + if (pendingSubscriptions.has(channel)) { + throw new Error(`A subscription for channel "${channel}" is already being established.`); + } + + if (subscriptionClients.size + pendingSubscriptions.size >= maxSubscriptions) { throw new Error( `Maximum subscription limit (${maxSubscriptions}) reached. Unsubscribe from existing channels before subscribing to new ones.`, ); } - const subClient = await GlideClientClass.createClient({ - ...clientConfig, - pubsubSubscriptions: { - channelsAndPatterns: { - [GlideClientConfigurationClass.PubSubChannelModes.Exact]: new Set([channel]), + pendingSubscriptions.add(channel); + try { + const pubsubConfig = { + ...clientConfig, + pubsubSubscriptions: { + channelsAndPatterns: { + [GlideClientConfigurationClass.PubSubChannelModes.Exact]: new Set([channel]), + }, + callback: (msg: { message: unknown }, _ctx: unknown) => callback(String(msg.message)), }, - callback: (msg: { message: unknown }, _ctx: unknown) => callback(String(msg.message)), - }, - }); - subscriptionClients.set(channel, subClient); + }; + + const subClient = + isClusterMode && GlideClusterClientClass + ? await GlideClusterClientClass.createClient(pubsubConfig) + : await GlideClientClass.createClient(pubsubConfig); + + subscriptionClients.set(channel, subClient); + } finally { + pendingSubscriptions.delete(channel); + } }, async unsubscribe(channel: string) { const subClient = subscriptionClients.get(channel); if (subClient) { - await subClient.close(); + subClient.close(); subscriptionClients.delete(channel); } }, @@ -180,12 +242,10 @@ export async function createResumableStreamValkeyStore( * `options` is **not** closed — the caller retains ownership of its lifecycle. */ async close() { - const closePromises: Promise[] = []; for (const subClient of subscriptionClients.values()) { - closePromises.push(Promise.resolve(subClient.close())); + subClient.close(); } subscriptionClients.clear(); - await Promise.all(closePromises); }, }; } From ec417c0b51cb778e41a64d5e4457a6afcfc3d2c3 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 08:05:49 -0500 Subject: [PATCH 11/22] fix(a2a-server): add TTL validation and key escaping to ValkeyTaskStore - Validate ttlSeconds in the ValkeyTaskStore constructor so direct instantiation has the same guards as createValkeyTaskStore - Escape colons in agentId and taskId within makeKey to prevent delimiter-based key collisions - Add docstrings to all exported interfaces, types, classes, methods, and factory functions Signed-off-by: Riley Des --- packages/a2a-server/src/valkey-store.ts | 66 ++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/packages/a2a-server/src/valkey-store.ts b/packages/a2a-server/src/valkey-store.ts index cfe9bf6f2..05d79d248 100644 --- a/packages/a2a-server/src/valkey-store.ts +++ b/packages/a2a-server/src/valkey-store.ts @@ -2,15 +2,33 @@ import type { GlideClient, GlideClusterClient, TimeUnit } from "@valkey/valkey-g import { safeStringify } from "@voltagent/internal"; import type { TaskRecord, TaskStore } from "./types"; +/** + * Configuration options for {@link ValkeyTaskStore}. + */ export interface ValkeyTaskStoreOptions { + /** Valkey client instance (standalone {@link GlideClient} or {@link GlideClusterClient}). */ client: GlideClient | GlideClusterClient; + /** Key prefix for all task records stored in Valkey. Defaults to `"a2a-tasks"`. */ keyPrefix?: string; + /** Optional TTL in seconds applied to every persisted task record. Must be a positive finite number. */ ttlSeconds?: number; } const VALKEY_GLIDE_REQUIRED = "@valkey/valkey-glide is required for ValkeyTaskStore. Install it with: pnpm add @valkey/valkey-glide"; +/** + * Creates a {@link ValkeyTaskStore} with eagerly-resolved Valkey dependencies. + * + * Validates `ttlSeconds` and pre-resolves the `TimeUnit.Seconds` enum from + * `@valkey/valkey-glide` so that subsequent `save()` calls do not need to + * perform a dynamic import. + * + * @param options - Store configuration including the Valkey client, optional key prefix, and TTL. + * @returns A fully initialised {@link ValkeyTaskStore} instance. + * @throws If `ttlSeconds` is provided but is not a positive finite number. + * @throws If `@valkey/valkey-glide` is not installed when `ttlSeconds` is set. + */ export async function createValkeyTaskStore( options: ValkeyTaskStoreOptions, ): Promise { @@ -51,13 +69,34 @@ export class ValkeyTaskStore implements TaskStore { // In-process only, not propagated across instances via Valkey. readonly activeCancellations = new Set(); + /** + * Creates a new ValkeyTaskStore. + * + * Prefer {@link createValkeyTaskStore} which eagerly resolves the Valkey + * `TimeUnit` dependency. Direct construction is supported but the caller + * must supply the resolved `timeUnitSeconds` when `ttlSeconds` is used. + * + * @param options - Store configuration. + * @param timeUnitSeconds - Pre-resolved `TimeUnit.Seconds` value from `@valkey/valkey-glide`. + * @throws If `ttlSeconds` is provided but is not a positive finite number. + */ constructor(options: ValkeyTaskStoreOptions, /** @internal */ timeUnitSeconds?: TimeUnit) { + if ( + options.ttlSeconds !== undefined && + (!Number.isFinite(options.ttlSeconds) || options.ttlSeconds <= 0) + ) { + throw new Error("ttlSeconds must be a positive finite number"); + } this.client = options.client; this.keyPrefix = options.keyPrefix ?? "a2a-tasks"; this.ttlSeconds = options.ttlSeconds; this.timeUnitSeconds = timeUnitSeconds; } + /** + * Lazily resolves and caches the `TimeUnit.Seconds` enum value from + * `@valkey/valkey-glide`. Called internally by {@link save} when TTL is configured. + */ private async getTimeUnitSeconds(): Promise { if (this.timeUnitSeconds !== undefined) return this.timeUnitSeconds; try { @@ -69,6 +108,13 @@ export class ValkeyTaskStore implements TaskStore { } } + /** + * Loads a task record from Valkey by agent and task ID. + * + * @param params - The agent ID and task ID identifying the record. + * @returns The deserialised {@link TaskRecord}, or `null` if not found. + * @throws If the stored value cannot be parsed as valid JSON. + */ async load(params: { agentId: string; taskId: string }): Promise { const key = this.makeKey(params.agentId, params.taskId); const result = await this.client.get(key); @@ -81,6 +127,15 @@ export class ValkeyTaskStore implements TaskStore { } } + /** + * Persists a task record to Valkey. + * + * The record is serialised with {@link safeStringify} and stored under a + * composite key derived from the agent ID and the record's task ID. When + * `ttlSeconds` is configured the key is set with an expiry. + * + * @param params - The agent ID and the {@link TaskRecord} to persist. + */ async save(params: { agentId: string; data: TaskRecord }): Promise { const key = this.makeKey(params.agentId, params.data.id); const json = safeStringify(params.data); @@ -95,7 +150,16 @@ export class ValkeyTaskStore implements TaskStore { } } + /** + * Builds the Valkey key for a given agent/task pair. + * + * Colons inside `agentId` and `taskId` are escaped to prevent collisions + * with the `keyPrefix:agentId::taskId` delimiter scheme. + */ private makeKey(agentId: string, taskId: string): string { - return `${this.keyPrefix}:${agentId}::${taskId}`; + // Escape colons in user-provided IDs to prevent key collisions with the delimiter. + const safeAgentId = agentId.replace(/:/g, "\\:"); + const safeTaskId = taskId.replace(/:/g, "\\:"); + return `${this.keyPrefix}:${safeAgentId}::${safeTaskId}`; } } From 5081830e12e88b6f83f4d94ffc52d0f27202d5dd Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 08:06:08 -0500 Subject: [PATCH 12/22] fix(changeset): bump @voltagent/a2a-server to major for breaking change The changeset body describes a breaking change to A2AServerConfig but the version was marked as minor. Signed-off-by: Riley Des --- .changeset/add-valkey-store-providers.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/add-valkey-store-providers.md b/.changeset/add-valkey-store-providers.md index 3af242e0d..0947d5885 100644 --- a/.changeset/add-valkey-store-providers.md +++ b/.changeset/add-valkey-store-providers.md @@ -1,5 +1,5 @@ --- -"@voltagent/a2a-server": minor +"@voltagent/a2a-server": major "@voltagent/resumable-streams": minor --- From c68b0e1f057cc89e9d72efb3650dde467b573bc2 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 08:06:23 -0500 Subject: [PATCH 13/22] refactor(a2a-server): use structured logger and narrow valkey exports - Replace console.debug with getGlobalLogger() in A2AServer.initialize - Replace wildcard re-export of valkey-store with explicit named exports to avoid leaking @valkey/valkey-glide types to non-Valkey consumers Signed-off-by: Riley Des --- packages/a2a-server/src/index.ts | 3 ++- packages/a2a-server/src/server.ts | 12 +++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/packages/a2a-server/src/index.ts b/packages/a2a-server/src/index.ts index 11b92af4c..521a663da 100644 --- a/packages/a2a-server/src/index.ts +++ b/packages/a2a-server/src/index.ts @@ -2,7 +2,8 @@ export * from "./types"; export * from "./server"; export * from "./protocol"; export * from "./store"; -export * from "./valkey-store"; +export { createValkeyTaskStore, ValkeyTaskStore } from "./valkey-store"; +export type { ValkeyTaskStoreOptions } from "./valkey-store"; export * from "./tasks"; export * from "./adapters/agent"; export * from "./adapters/message"; diff --git a/packages/a2a-server/src/server.ts b/packages/a2a-server/src/server.ts index 02e166bf2..e2e948ed7 100644 --- a/packages/a2a-server/src/server.ts +++ b/packages/a2a-server/src/server.ts @@ -1,5 +1,5 @@ import { randomUUID } from "node:crypto"; -import { type Agent, convertUsage } from "@voltagent/core"; +import { type Agent, convertUsage, getGlobalLogger } from "@voltagent/core"; import { buildAgentCard } from "./adapters/agent"; import { fromVoltAgentMessage, toVoltAgentMessage } from "./adapters/message"; import { createSuccessResponse, normalizeError } from "./protocol"; @@ -85,10 +85,12 @@ export class A2AServer { */ initialize(deps: A2AServerDeps): void { if (this.config.taskStore && deps.taskStore) { - console.debug( - "[A2AServer] config.taskStore is overriding deps.taskStore. " + - "The task store provided in A2AServerConfig takes precedence.", - ); + getGlobalLogger() + .child({ component: "a2a-server" }) + .debug( + "config.taskStore is overriding deps.taskStore. " + + "The task store provided in A2AServerConfig takes precedence.", + ); } this.deps = { From a3675e2121221c6d4e76eb5fb97a4d96030720c0 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 08:07:19 -0500 Subject: [PATCH 14/22] fix(examples): fix tsconfig, port validation, and README lint - Change module from ESNext to NodeNext in tsconfig - Add VALKEY_PORT validation (integer, 1-65535 range) - Add text language identifier to fenced code block (MD040) Signed-off-by: Riley Des --- examples/with-valkey-store/README.md | 2 +- examples/with-valkey-store/src/index.ts | 12 +++++++++++- examples/with-valkey-store/tsconfig.json | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/examples/with-valkey-store/README.md b/examples/with-valkey-store/README.md index 063428b59..71b8999d5 100644 --- a/examples/with-valkey-store/README.md +++ b/examples/with-valkey-store/README.md @@ -47,7 +47,7 @@ This example demonstrates how to use **Valkey** as a distributed backing store f ## Structure -``` +```text examples/with-valkey-store ├── src/ │ ├── agents/assistant.ts # Example agent definition diff --git a/examples/with-valkey-store/src/index.ts b/examples/with-valkey-store/src/index.ts index 63dbc49cc..037aa8a6a 100644 --- a/examples/with-valkey-store/src/index.ts +++ b/examples/with-valkey-store/src/index.ts @@ -15,8 +15,18 @@ const logger = createPinoLogger({ }); const host = process.env.VALKEY_HOST ?? "localhost"; -const port = Number(process.env.VALKEY_PORT ?? 6379); +const rawPort = process.env.VALKEY_PORT; +const port = rawPort !== undefined ? Number(rawPort) : 6379; +if (!Number.isInteger(port) || port < 1 || port > 65535) { + throw new Error(`Invalid VALKEY_PORT "${rawPort}": must be an integer between 1 and 65535`); +} +/** + * Bootstraps a VoltAgent instance backed by Valkey for both A2A task + * persistence and resumable streaming. Connects to the Valkey server + * specified by `VALKEY_HOST` / `VALKEY_PORT` environment variables + * (defaulting to `localhost:6379`), then starts an HTTP server on port 3141. + */ async function main() { const valkeyClient = await GlideClient.createClient({ addresses: [{ host, port }], diff --git a/examples/with-valkey-store/tsconfig.json b/examples/with-valkey-store/tsconfig.json index f58e64901..811b07146 100644 --- a/examples/with-valkey-store/tsconfig.json +++ b/examples/with-valkey-store/tsconfig.json @@ -3,7 +3,7 @@ "compilerOptions": { "rootDir": "src", "outDir": "dist", - "module": "ESNext", + "module": "NodeNext", "moduleResolution": "NodeNext", "target": "ES2022", "types": ["node"], From d418794b4755004ad012b7192e975c46466b88f8 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 11:05:19 -0500 Subject: [PATCH 15/22] refactor(a2a-server): improve Zod schema integration and test coverage - Add missing test for Zod validation failure on malformed TaskRecord - Use safeStringify(validation.error.issues) for structured error output - Document intentional save/load validation asymmetry in JSDoc Signed-off-by: Riley Des --- packages/a2a-server/src/schemas.ts | 68 +++++++++++++++++ packages/a2a-server/src/types.ts | 77 ++++++++------------ packages/a2a-server/src/valkey-store.spec.ts | 12 +++ packages/a2a-server/src/valkey-store.ts | 19 ++++- 4 files changed, 127 insertions(+), 49 deletions(-) create mode 100644 packages/a2a-server/src/schemas.ts diff --git a/packages/a2a-server/src/schemas.ts b/packages/a2a-server/src/schemas.ts new file mode 100644 index 000000000..43310c99a --- /dev/null +++ b/packages/a2a-server/src/schemas.ts @@ -0,0 +1,68 @@ +import { z } from "zod"; + +/** + * Zod schemas for A2A task-related types. + * + * These are the **single source of truth** — the corresponding TypeScript + * types in `./types.ts` are derived via `z.infer` so the runtime validation + * and static types can never drift apart. + */ + +export const TaskStateSchema = z.enum([ + "submitted", + "working", + "input-required", + "completed", + "failed", + "canceled", +]); + +export const A2AMessagePartTextSchema = z.object({ + kind: z.literal("text"), + text: z.string(), +}); + +/** + * Currently only `text` parts exist. When new part kinds are added, extend + * this with `z.discriminatedUnion("kind", [...])`. + */ +export const A2AMessagePartSchema = A2AMessagePartTextSchema; + +export const A2AMessageSchema = z.object({ + kind: z.literal("message"), + role: z.enum(["user", "agent"]), + messageId: z.string(), + parts: z.array(A2AMessagePartSchema), + contextId: z.string().optional(), + taskId: z.string().optional(), + referenceTaskIds: z.array(z.string()).optional(), + extensions: z.array(z.string()).optional(), + metadata: z.record(z.unknown()).optional(), +}); + +export const TaskStatusSchema = z.object({ + state: TaskStateSchema, + message: A2AMessageSchema.optional(), + timestamp: z.string(), +}); + +export const TaskArtifactPartSchema = z.object({ + kind: z.literal("text"), + text: z.string(), +}); + +export const TaskArtifactSchema = z.object({ + name: z.string(), + parts: z.array(TaskArtifactPartSchema), + description: z.string().optional(), + metadata: z.record(z.unknown()).optional(), +}); + +export const TaskRecordSchema = z.object({ + id: z.string(), + contextId: z.string(), + status: TaskStatusSchema, + history: z.array(A2AMessageSchema), + artifacts: z.array(TaskArtifactSchema).optional(), + metadata: z.record(z.unknown()).optional(), +}); diff --git a/packages/a2a-server/src/types.ts b/packages/a2a-server/src/types.ts index 3c8d240e2..a9cebda0d 100644 --- a/packages/a2a-server/src/types.ts +++ b/packages/a2a-server/src/types.ts @@ -4,6 +4,17 @@ import type { A2AServerLike as BaseA2AServerLike, A2AServerMetadata as BaseA2AServerMetadata, } from "@voltagent/internal/a2a"; +import type { z } from "zod"; +import type { + A2AMessagePartSchema, + A2AMessagePartTextSchema, + A2AMessageSchema, + TaskArtifactPartSchema, + TaskArtifactSchema, + TaskRecordSchema, + TaskStateSchema, + TaskStatusSchema, +} from "./schemas"; export type A2AJsonRpcId = string | number | null; @@ -37,59 +48,29 @@ export interface JsonRpcRequest { params?: Params; } -export type TaskState = - | "submitted" - | "working" - | "input-required" - | "completed" - | "failed" - | "canceled"; - -export interface A2AMessagePartText { - kind: "text"; - text: string; -} +/** The set of valid task lifecycle states. Derived from {@link TaskStateSchema}. */ +export type TaskState = z.infer; -export type A2AMessagePart = A2AMessagePartText; - -export interface A2AMessage { - kind: "message"; - role: "user" | "agent"; - messageId: string; - parts: A2AMessagePart[]; - contextId?: string; - taskId?: string; - referenceTaskIds?: string[]; - extensions?: string[]; - metadata?: Record; -} +/** A text-only message part. Derived from {@link A2AMessagePartTextSchema}. */ +export type A2AMessagePartText = z.infer; -export interface TaskStatus { - state: TaskState; - message?: A2AMessage; - timestamp: string; -} +/** Union of all message part kinds. Derived from {@link A2AMessagePartSchema}. */ +export type A2AMessagePart = z.infer; -export interface TaskArtifactPart { - kind: "text"; - text: string; -} +/** A single message exchanged between user and agent. Derived from {@link A2AMessageSchema}. */ +export type A2AMessage = z.infer; -export interface TaskArtifact { - name: string; - parts: TaskArtifactPart[]; - description?: string; - metadata?: Record; -} +/** Current status of a task including lifecycle state and timestamp. Derived from {@link TaskStatusSchema}. */ +export type TaskStatus = z.infer; -export interface TaskRecord { - id: string; - contextId: string; - status: TaskStatus; - history: A2AMessage[]; - artifacts?: TaskArtifact[]; - metadata?: Record; -} +/** A text-only artifact part. Derived from {@link TaskArtifactPartSchema}. */ +export type TaskArtifactPart = z.infer; + +/** An artifact produced by an agent during task execution. Derived from {@link TaskArtifactSchema}. */ +export type TaskArtifact = z.infer; + +/** A complete task record including status, history, and optional artifacts. Derived from {@link TaskRecordSchema}. */ +export type TaskRecord = z.infer; export interface TaskStore { load(params: { agentId: string; taskId: string }): Promise; diff --git a/packages/a2a-server/src/valkey-store.spec.ts b/packages/a2a-server/src/valkey-store.spec.ts index 9061e9093..bb09fedcc 100644 --- a/packages/a2a-server/src/valkey-store.spec.ts +++ b/packages/a2a-server/src/valkey-store.spec.ts @@ -1,4 +1,5 @@ import { safeStringify } from "@voltagent/internal"; +import { TaskRecordSchema } from "./schemas"; import type { TaskRecord } from "./types"; import { ValkeyTaskStore, createValkeyTaskStore } from "./valkey-store"; @@ -137,6 +138,17 @@ describe("ValkeyTaskStore", () => { ); }); + it("load() throws when stored data is valid JSON but fails schema validation", async () => { + const client = makeClient(); + // Valid JSON but missing required TaskRecord fields (id, contextId, status, history) + client.get.mockResolvedValue(safeStringify({ bogus: true })); + + const store = new ValkeyTaskStore({ client, keyPrefix: "pfx" } as any); + await expect(store.load({ agentId: "agent-1", taskId: "task-1" })).rejects.toThrow( + /Invalid TaskRecord for key "pfx:agent-1::task-1"/, + ); + }); + it("save() propagates errors thrown by client.set", async () => { const client = makeClient(); client.set.mockRejectedValue(new Error("write timeout")); diff --git a/packages/a2a-server/src/valkey-store.ts b/packages/a2a-server/src/valkey-store.ts index 05d79d248..14c88a42d 100644 --- a/packages/a2a-server/src/valkey-store.ts +++ b/packages/a2a-server/src/valkey-store.ts @@ -1,5 +1,6 @@ import type { GlideClient, GlideClusterClient, TimeUnit } from "@valkey/valkey-glide"; import { safeStringify } from "@voltagent/internal"; +import { TaskRecordSchema } from "./schemas"; import type { TaskRecord, TaskStore } from "./types"; /** @@ -119,12 +120,23 @@ export class ValkeyTaskStore implements TaskStore { const key = this.makeKey(params.agentId, params.taskId); const result = await this.client.get(key); if (result === null) return null; + + let parsed: unknown; try { - return JSON.parse(String(result)) as TaskRecord; + parsed = JSON.parse(String(result)); } catch (error) { const detail = error instanceof Error ? error.message : "Unknown error"; throw new Error(`Failed to parse stored TaskRecord for key "${key}": ${detail}`); } + + const validation = TaskRecordSchema.safeParse(parsed); + if (!validation.success) { + throw new Error( + `Invalid TaskRecord for key "${key}": ${safeStringify(validation.error.issues)}`, + ); + } + + return validation.data; } /** @@ -134,6 +146,11 @@ export class ValkeyTaskStore implements TaskStore { * composite key derived from the agent ID and the record's task ID. When * `ttlSeconds` is configured the key is set with an expiry. * + * Note: unlike {@link load}, `save` does **not** run Zod validation. The + * caller is trusted to supply a well-typed `TaskRecord`, and skipping + * validation on the write path avoids the per-call overhead. Any schema + * drift will surface on the next `load()`. + * * @param params - The agent ID and the {@link TaskRecord} to persist. */ async save(params: { agentId: string; data: TaskRecord }): Promise { From ccb3a127b98ca8b23eaf2934dd5634f0fd598ec8 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 11:47:43 -0500 Subject: [PATCH 16/22] refactor(resumable-streams,a2a-server): move Valkey exports to sub-path entry point Move Valkey store types and factory functions from the main package entry point to a dedicated `./valkey-store` sub-path export. This prevents TypeScript consumers who don't use Valkey from needing `@valkey/valkey-glide` installed for type resolution, keeping it correctly optional. - Add `src/valkey-store.ts` as a second tsup entry point - Add `./valkey-store` export map in package.json - Remove Valkey re-exports from `src/index.ts` Signed-off-by: Riley Des --- packages/a2a-server/package.json | 10 ++++++++++ packages/a2a-server/src/index.ts | 2 -- packages/a2a-server/tsup.config.ts | 2 +- packages/resumable-streams/package.json | 10 ++++++++++ packages/resumable-streams/src/index.ts | 6 ------ packages/resumable-streams/tsup.config.ts | 2 +- 6 files changed, 22 insertions(+), 10 deletions(-) diff --git a/packages/a2a-server/package.json b/packages/a2a-server/package.json index 17e989978..34f6df3fc 100644 --- a/packages/a2a-server/package.json +++ b/packages/a2a-server/package.json @@ -20,6 +20,16 @@ "types": "./dist/index.d.ts", "default": "./dist/index.js" } + }, + "./valkey-store": { + "import": { + "types": "./dist/valkey-store.d.mts", + "default": "./dist/valkey-store.mjs" + }, + "require": { + "types": "./dist/valkey-store.d.ts", + "default": "./dist/valkey-store.js" + } } }, "files": [ diff --git a/packages/a2a-server/src/index.ts b/packages/a2a-server/src/index.ts index 521a663da..673b8a56b 100644 --- a/packages/a2a-server/src/index.ts +++ b/packages/a2a-server/src/index.ts @@ -2,8 +2,6 @@ export * from "./types"; export * from "./server"; export * from "./protocol"; export * from "./store"; -export { createValkeyTaskStore, ValkeyTaskStore } from "./valkey-store"; -export type { ValkeyTaskStoreOptions } from "./valkey-store"; export * from "./tasks"; export * from "./adapters/agent"; export * from "./adapters/message"; diff --git a/packages/a2a-server/tsup.config.ts b/packages/a2a-server/tsup.config.ts index 0819104fd..ff573024e 100644 --- a/packages/a2a-server/tsup.config.ts +++ b/packages/a2a-server/tsup.config.ts @@ -2,7 +2,7 @@ import { defineConfig } from "tsup"; import { markAsExternalPlugin } from "../shared/tsup-plugins/mark-as-external"; export default defineConfig({ - entry: ["src/index.ts"], + entry: ["src/index.ts", "src/valkey-store.ts"], format: ["cjs", "esm"], splitting: false, sourcemap: true, diff --git a/packages/resumable-streams/package.json b/packages/resumable-streams/package.json index 8efce75de..369d69bc8 100644 --- a/packages/resumable-streams/package.json +++ b/packages/resumable-streams/package.json @@ -20,6 +20,16 @@ "types": "./dist/index.d.ts", "default": "./dist/index.js" } + }, + "./valkey-store": { + "import": { + "types": "./dist/valkey-store.d.mts", + "default": "./dist/valkey-store.mjs" + }, + "require": { + "types": "./dist/valkey-store.d.ts", + "default": "./dist/valkey-store.js" + } } }, "files": [ diff --git a/packages/resumable-streams/src/index.ts b/packages/resumable-streams/src/index.ts index 2deda4dd5..9e47ff561 100644 --- a/packages/resumable-streams/src/index.ts +++ b/packages/resumable-streams/src/index.ts @@ -24,9 +24,3 @@ export { type ResumableChatHandlersOptions, } from "./chat-handlers"; export { createResumableChatSession, type ResumableChatSession } from "./chat-session"; -export { createResumableStreamValkeyStore } from "./valkey-store"; -export type { - ResumableStreamValkeyStoreOptions, - ValkeyConnectionConfig, - ValkeyResumableStreamStore, -} from "./valkey-store"; diff --git a/packages/resumable-streams/tsup.config.ts b/packages/resumable-streams/tsup.config.ts index e019584d6..c2af14e7f 100644 --- a/packages/resumable-streams/tsup.config.ts +++ b/packages/resumable-streams/tsup.config.ts @@ -2,7 +2,7 @@ import { defineConfig } from "tsup"; import { markAsExternalPlugin } from "../shared/tsup-plugins/mark-as-external"; export default defineConfig({ - entry: ["src/index.ts"], + entry: ["src/index.ts", "src/valkey-store.ts"], format: ["cjs", "esm"], splitting: false, sourcemap: true, From d0ae0e9a8c2d8761fc22927631a331717e6f4139 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 11:48:31 -0500 Subject: [PATCH 17/22] fix(examples): update with-valkey-store imports for sub-path exports Update imports to use the new `./valkey-store` sub-path exports from `@voltagent/a2a-server` and `@voltagent/resumable-streams`. Fix missing `.js` extension on relative import required by NodeNext module resolution. Signed-off-by: Riley Des --- examples/with-valkey-store/src/index.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/examples/with-valkey-store/src/index.ts b/examples/with-valkey-store/src/index.ts index 037aa8a6a..e25bea6e5 100644 --- a/examples/with-valkey-store/src/index.ts +++ b/examples/with-valkey-store/src/index.ts @@ -1,13 +1,12 @@ import { GlideClient } from "@valkey/valkey-glide"; -import { A2AServer, createValkeyTaskStore } from "@voltagent/a2a-server"; +import { A2AServer } from "@voltagent/a2a-server"; +import { createValkeyTaskStore } from "@voltagent/a2a-server/valkey-store"; import { VoltAgent } from "@voltagent/core"; import { createPinoLogger } from "@voltagent/logger"; -import { - createResumableStreamAdapter, - createResumableStreamValkeyStore, -} from "@voltagent/resumable-streams"; +import { createResumableStreamAdapter } from "@voltagent/resumable-streams"; +import { createResumableStreamValkeyStore } from "@voltagent/resumable-streams/valkey-store"; import { honoServer } from "@voltagent/server-hono"; -import { assistant } from "./agents/assistant"; +import { assistant } from "./agents/assistant.js"; const logger = createPinoLogger({ name: "with-valkey-store", From a80210c9525f9e82d79599a301dcabac32fe717e Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 11:52:46 -0500 Subject: [PATCH 18/22] fix(a2a-server): normalize task ID in ValkeyTaskStore.save to prevent undefined keys Signed-off-by: Riley Des --- packages/a2a-server/src/valkey-store.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/a2a-server/src/valkey-store.ts b/packages/a2a-server/src/valkey-store.ts index 14c88a42d..2abce5ea6 100644 --- a/packages/a2a-server/src/valkey-store.ts +++ b/packages/a2a-server/src/valkey-store.ts @@ -1,3 +1,4 @@ +import { randomUUID } from "node:crypto"; import type { GlideClient, GlideClusterClient, TimeUnit } from "@valkey/valkey-glide"; import { safeStringify } from "@voltagent/internal"; import { TaskRecordSchema } from "./schemas"; @@ -154,8 +155,11 @@ export class ValkeyTaskStore implements TaskStore { * @param params - The agent ID and the {@link TaskRecord} to persist. */ async save(params: { agentId: string; data: TaskRecord }): Promise { - const key = this.makeKey(params.agentId, params.data.id); - const json = safeStringify(params.data); + const taskId = params.data.id ?? randomUUID(); + const normalized: TaskRecord = + taskId === params.data.id ? params.data : { ...params.data, id: taskId }; + const key = this.makeKey(params.agentId, taskId); + const json = safeStringify(normalized); if (this.ttlSeconds !== undefined) { const seconds = await this.getTimeUnitSeconds(); From ff0af2de15fde15358b16cfbe7bf259c1465e0a5 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Fri, 1 May 2026 11:59:45 -0500 Subject: [PATCH 19/22] fix(a2a-server): detect safeStringify error sentinel before persisting task record Signed-off-by: Riley Des --- packages/a2a-server/src/valkey-store.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/a2a-server/src/valkey-store.ts b/packages/a2a-server/src/valkey-store.ts index 2abce5ea6..0ec5dd13c 100644 --- a/packages/a2a-server/src/valkey-store.ts +++ b/packages/a2a-server/src/valkey-store.ts @@ -161,6 +161,12 @@ export class ValkeyTaskStore implements TaskStore { const key = this.makeKey(params.agentId, taskId); const json = safeStringify(normalized); + if (json.startsWith("SAFE_STRINGIFY_ERROR:")) { + throw new Error( + `Failed to serialize TaskRecord for agent "${params.agentId}", task "${taskId}": ${json}`, + ); + } + if (this.ttlSeconds !== undefined) { const seconds = await this.getTimeUnitSeconds(); await this.client.set(key, json, { From 0fca0dd30b5fee766f6b0b5882f3eb8b630fc48f Mon Sep 17 00:00:00 2001 From: Riley Des Date: Mon, 4 May 2026 08:18:19 -0500 Subject: [PATCH 20/22] docs(a2a-server,resumable-streams): add JSDoc comments to schemas, types, and server - Add JSDoc documentation to all exported Zod schemas in schemas.ts - Add JSDoc documentation to type definitions and interfaces in types.ts - Add JSDoc documentation to A2AServer class and public methods - Add JSDoc documentation to example agent configuration Signed-off-by: Riley Des --- .../with-valkey-store/src/agents/assistant.ts | 1 + packages/a2a-server/src/schemas.ts | 12 ++-- packages/a2a-server/src/server.ts | 15 +++++ packages/a2a-server/src/types.ts | 40 +++++++++++ .../src/resumable-streams.ts | 67 +++++++++++++++++++ 5 files changed, 131 insertions(+), 4 deletions(-) diff --git a/examples/with-valkey-store/src/agents/assistant.ts b/examples/with-valkey-store/src/agents/assistant.ts index 95196b2ea..a5e74823b 100644 --- a/examples/with-valkey-store/src/agents/assistant.ts +++ b/examples/with-valkey-store/src/agents/assistant.ts @@ -12,6 +12,7 @@ const statusTool = createTool({ }, }); +/** Pre-configured support agent with a `status` tool that returns the current time. */ export const assistant = new Agent({ id: "supportagent", name: "SupportAgent", diff --git a/packages/a2a-server/src/schemas.ts b/packages/a2a-server/src/schemas.ts index 43310c99a..cf2cb86ad 100644 --- a/packages/a2a-server/src/schemas.ts +++ b/packages/a2a-server/src/schemas.ts @@ -8,6 +8,7 @@ import { z } from "zod"; * and static types can never drift apart. */ +/** Zod schema for the set of valid task lifecycle states. */ export const TaskStateSchema = z.enum([ "submitted", "working", @@ -17,17 +18,16 @@ export const TaskStateSchema = z.enum([ "canceled", ]); +/** Zod schema for a text-only message part. */ export const A2AMessagePartTextSchema = z.object({ kind: z.literal("text"), text: z.string(), }); -/** - * Currently only `text` parts exist. When new part kinds are added, extend - * this with `z.discriminatedUnion("kind", [...])`. - */ +/** Zod schema for a message part. Currently only `text` parts exist; extend with `z.discriminatedUnion` when new kinds are added. */ export const A2AMessagePartSchema = A2AMessagePartTextSchema; +/** Zod schema for a single A2A message (user or agent). */ export const A2AMessageSchema = z.object({ kind: z.literal("message"), role: z.enum(["user", "agent"]), @@ -40,17 +40,20 @@ export const A2AMessageSchema = z.object({ metadata: z.record(z.unknown()).optional(), }); +/** Zod schema for a task's current status including state and timestamp. */ export const TaskStatusSchema = z.object({ state: TaskStateSchema, message: A2AMessageSchema.optional(), timestamp: z.string(), }); +/** Zod schema for a text-only artifact part. */ export const TaskArtifactPartSchema = z.object({ kind: z.literal("text"), text: z.string(), }); +/** Zod schema for an artifact produced by an agent during task execution. */ export const TaskArtifactSchema = z.object({ name: z.string(), parts: z.array(TaskArtifactPartSchema), @@ -58,6 +61,7 @@ export const TaskArtifactSchema = z.object({ metadata: z.record(z.unknown()).optional(), }); +/** Zod schema for a complete task record including status, history, and optional artifacts. */ export const TaskRecordSchema = z.object({ id: z.string(), contextId: z.string(), diff --git a/packages/a2a-server/src/server.ts b/packages/a2a-server/src/server.ts index e2e948ed7..71f3ae8aa 100644 --- a/packages/a2a-server/src/server.ts +++ b/packages/a2a-server/src/server.ts @@ -55,6 +55,13 @@ function resolveAgentCardUrl(serverId: string, requestUrl?: string): string { } } +/** + * A2A (Agent-to-Agent) protocol server. + * + * Manages agent registration, JSON-RPC request routing, task lifecycle, and + * streaming responses. Call {@link initialize} with runtime dependencies + * before handling any requests. + */ export class A2AServer { private deps?: Required; private readonly config: A2AServerConfig; @@ -62,6 +69,7 @@ export class A2AServer { private readonly configuredAgents = new Map(); private readonly agentFilter: A2AFilterFunction | undefined; + /** Creates a new A2AServer from the given configuration, registering any pre-configured agents. */ constructor(config: A2AServerConfig) { this.config = config; this.agentFilter = config.filterAgents; @@ -99,6 +107,7 @@ export class A2AServer { } as Required; } + /** Returns the server's public metadata (id, name, version, description, provider). */ getMetadata() { return { id: this.config.id, @@ -109,6 +118,7 @@ export class A2AServer { }; } + /** Builds and returns the {@link AgentCard} for the specified agent, including its endpoint URL. */ getAgentCard(agentId: string, context: A2ARequestContext = {}): AgentCard { const agent = this.resolveAgent(agentId, context); const url = resolveAgentCardUrl(agentId, context.requestUrl); @@ -124,6 +134,11 @@ export class A2AServer { }); } + /** + * Routes an incoming JSON-RPC request to the appropriate handler. + * + * Supported methods: `message/send`, `message/stream`, `tasks/get`, `tasks/cancel`. + */ async handleRequest( agentId: string, request: JsonRpcRequest, diff --git a/packages/a2a-server/src/types.ts b/packages/a2a-server/src/types.ts index a9cebda0d..8980bc9ec 100644 --- a/packages/a2a-server/src/types.ts +++ b/packages/a2a-server/src/types.ts @@ -16,14 +16,17 @@ import type { TaskStatusSchema, } from "./schemas"; +/** Identifier for a JSON-RPC request — a string, number, or `null` for notifications. */ export type A2AJsonRpcId = string | number | null; +/** Standard JSON-RPC 2.0 error object returned inside a {@link JsonRpcResponse}. */ export interface JsonRpcError { code: number; message: string; data?: Data; } +/** Standard JSON-RPC 2.0 response envelope. */ export interface JsonRpcResponse { jsonrpc: "2.0"; id: A2AJsonRpcId; @@ -31,16 +34,19 @@ export interface JsonRpcResponse { error?: JsonRpcError | null; } +/** Wrapper around an async generator that yields {@link JsonRpcResponse} objects for streaming calls. */ export interface JsonRpcStream { kind: "stream"; id: A2AJsonRpcId; stream: AsyncGenerator>; } +/** Discriminated union of a single {@link JsonRpcResponse} or a {@link JsonRpcStream}. */ export type JsonRpcHandlerResult = | JsonRpcResponse | JsonRpcStream; +/** Incoming JSON-RPC 2.0 request envelope. */ export interface JsonRpcRequest { jsonrpc: "2.0"; id: A2AJsonRpcId; @@ -72,11 +78,13 @@ export type TaskArtifact = z.infer; /** A complete task record including status, history, and optional artifacts. Derived from {@link TaskRecordSchema}. */ export type TaskRecord = z.infer; +/** Persistence layer for loading and saving {@link TaskRecord} instances keyed by agent and task ID. */ export interface TaskStore { load(params: { agentId: string; taskId: string }): Promise; save(params: { agentId: string; data: TaskRecord }): Promise; } +/** Per-request context forwarded to agent invocations and filter functions. */ export interface A2ARequestContext { userId?: string; sessionId?: string; @@ -84,13 +92,16 @@ export interface A2ARequestContext { requestUrl?: string; } +/** Parameters passed to an {@link A2AFilterFunction} for filtering a list of items. */ export interface A2AFilterParams { items: T[]; context?: A2ARequestContext; } +/** Callback that filters a list of items (e.g. agents) based on the current request context. */ export type A2AFilterFunction = (params: A2AFilterParams) => T[]; +/** Parameters for the `message/send` and `message/stream` JSON-RPC methods. */ export interface MessageSendParams { id?: string; sessionId?: string; @@ -99,21 +110,27 @@ export interface MessageSendParams { message: A2AMessage; } +/** Parameters for the `tasks/get` JSON-RPC method. */ export interface TaskQueryParams { id: string; historyLength?: number; metadata?: Record; } +/** Parameters for the `tasks/cancel` JSON-RPC method. */ export interface TaskIdParams { id: string; metadata?: Record; } +/** Result of a `message/send` or `message/stream` call — the updated {@link TaskRecord}. */ export type MessageSendResult = TaskRecord; +/** Result of a `tasks/get` call — the requested {@link TaskRecord}. */ export type TaskGetResult = TaskRecord; +/** Result of a `tasks/cancel` call — the canceled {@link TaskRecord}. */ export type TaskCancelResult = TaskRecord; +/** Configuration supplied to the {@link A2AServer} constructor. */ export interface A2AServerConfig { id?: string; name: string; @@ -132,12 +149,15 @@ export interface A2AServerConfig { taskStore?: TaskStore; } +/** Metadata describing an {@link A2AServer} instance (name, version, etc.). */ export interface A2AServerMetadata extends BaseA2AServerMetadata {} +/** Runtime dependencies injected into {@link A2AServer} via `initialize()`. */ export interface A2AServerDeps extends BaseA2AServerDeps { taskStore?: TaskStore; } +/** Minimal public surface of an A2A server, used by server-provider adapters. */ export interface A2AServerLike extends BaseA2AServerLike { getAgentCard?(agentId: string, context?: A2ARequestContext): AgentCard; handleRequest?( @@ -147,8 +167,10 @@ export interface A2AServerLike extends BaseA2AServerLike { ): Promise; } +/** Factory function that creates a new {@link A2AServerLike} instance on demand. */ export type A2AServerFactory = () => T; +/** A single skill advertised by an agent in its {@link AgentCard}. */ export interface AgentCardSkill { id: string; name: string; @@ -156,6 +178,7 @@ export interface AgentCardSkill { tags?: string[]; } +/** Public metadata card describing an agent's capabilities, skills, and endpoint URL. */ export interface AgentCard { name: string; description?: string; @@ -175,6 +198,7 @@ export interface AgentCard { skills: AgentCardSkill[]; } +/** Standard A2A JSON-RPC error codes, extending the base JSON-RPC 2.0 error range with task-specific codes. */ export const A2AErrorCode = { PARSE_ERROR: -32700, INVALID_REQUEST: -32600, @@ -189,7 +213,14 @@ export const A2AErrorCode = { export type A2AErrorCode = (typeof A2AErrorCode)[keyof typeof A2AErrorCode]; +/** + * Typed error class for A2A JSON-RPC failures. + * + * Provides static factory methods for every standard error code so callers + * never need to remember numeric codes. + */ export class VoltA2AError extends Error { + /** Creates a new {@link VoltA2AError} with the given code, message, and optional details. */ constructor( public code: A2AErrorCode, message: string, @@ -200,6 +231,7 @@ export class VoltA2AError extends Error { this.name = "VoltA2AError"; } + /** Converts this error into a plain {@link JsonRpcError} object suitable for serialisation. */ toJsonRpcError(): JsonRpcError { return { code: this.code, @@ -211,22 +243,27 @@ export class VoltA2AError extends Error { }; } + /** Creates a parse-error (-32700) for malformed JSON payloads. */ static parseError(details?: unknown) { return new VoltA2AError(A2AErrorCode.PARSE_ERROR, "Invalid JSON payload", details); } + /** Creates an invalid-request (-32600) error. */ static invalidRequest(message = "Invalid request", details?: unknown) { return new VoltA2AError(A2AErrorCode.INVALID_REQUEST, message, details); } + /** Creates a method-not-found (-32601) error for an unknown JSON-RPC method. */ static methodNotFound(method: string) { return new VoltA2AError(A2AErrorCode.METHOD_NOT_FOUND, `Unknown method '${method}'`); } + /** Creates an invalid-params (-32602) error. */ static invalidParams(message = "Invalid parameters", details?: unknown) { return new VoltA2AError(A2AErrorCode.INVALID_PARAMS, message, details); } + /** Creates a task-not-found (-32001) error for the given task ID. */ static taskNotFound(taskId: string) { return new VoltA2AError( A2AErrorCode.TASK_NOT_FOUND, @@ -236,6 +273,7 @@ export class VoltA2AError extends Error { ); } + /** Creates a task-not-cancelable (-32002) error for a task that can no longer be canceled. */ static taskNotCancelable(taskId: string) { return new VoltA2AError( A2AErrorCode.TASK_NOT_CANCELABLE, @@ -245,10 +283,12 @@ export class VoltA2AError extends Error { ); } + /** Creates an unsupported-operation (-32004) error. */ static unsupportedOperation(message = "Unsupported operation") { return new VoltA2AError(A2AErrorCode.UNSUPPORTED_OPERATION, message); } + /** Creates an internal-error (-32603) error. */ static internal(message = "Internal error", details?: unknown) { return new VoltA2AError(A2AErrorCode.INTERNAL_ERROR, message, details); } diff --git a/packages/resumable-streams/src/resumable-streams.ts b/packages/resumable-streams/src/resumable-streams.ts index 50544b426..80c282164 100644 --- a/packages/resumable-streams/src/resumable-streams.ts +++ b/packages/resumable-streams/src/resumable-streams.ts @@ -264,6 +264,12 @@ export const mergeStreamAndActiveStore = ( clearActiveStream: activeStreamStore.clearActiveStream, }); +/** + * Creates an in-memory active-stream store that tracks which stream ID is + * currently active for each conversation/user pair. + * + * @returns A {@link ResumableStreamActiveStore} backed by a `Map`. + */ export function createMemoryResumableStreamActiveStore(): ResumableStreamActiveStore { const activeStreams = new Map(); @@ -387,6 +393,14 @@ const createInMemoryPubSub = () => { return { publisher, subscriber }; }; +/** + * Creates an in-memory resumable stream store backed by the `resumable-stream` library. + * + * Useful for development and testing. Data does not survive process restarts. + * + * @param options - Optional key prefix and `waitUntil` callback. + * @returns A {@link ResumableStreamStore} with active-stream tracking. + */ export async function createResumableStreamMemoryStore( options: ResumableStreamStoreOptions = {}, ): Promise { @@ -406,6 +420,15 @@ export async function createResumableStreamMemoryStore( return markResumableStreamStoreType(mergedStore, "memory", "Memory"); } +/** + * Creates a Redis-backed resumable stream store. + * + * If `publisher` / `subscriber` clients are not provided, they are created + * automatically from the `REDIS_URL` or `KV_URL` environment variable. + * + * @param options - Redis connection and key prefix options. + * @returns A {@link ResumableStreamStore} with active-stream tracking. + */ export async function createResumableStreamRedisStore( options: ResumableStreamRedisStoreOptions = {}, ): Promise { @@ -450,6 +473,13 @@ export async function createResumableStreamRedisStore( return markResumableStreamStoreType(mergedStore, "redis", "Redis"); } +/** + * Creates a resumable stream store from user-supplied publisher and subscriber instances. + * + * @param options - Must include both `publisher` and `subscriber`. + * @returns A {@link ResumableStreamStore} with active-stream tracking. + * @throws If `publisher` or `subscriber` is missing. + */ export async function createResumableStreamGenericStore( options: ResumableStreamGenericStoreOptions, ): Promise { @@ -472,6 +502,14 @@ export async function createResumableStreamGenericStore( return markResumableStreamStoreType(mergedStore, "custom", "Custom"); } +/** + * Creates a resumable stream store backed by the VoltOps managed service. + * + * Returns a disabled store when the required API keys are not configured. + * + * @param options - VoltOps client or API key configuration. + * @returns A {@link ResumableStreamStore} with active-stream tracking, or a disabled stub. + */ export async function createResumableStreamVoltOpsStore( options: ResumableStreamVoltOpsStoreOptions = {}, ): Promise { @@ -575,6 +613,16 @@ export async function createResumableStreamVoltOpsStore( return markResumableStreamStoreType(mergedStore, "voltops", "VoltOps"); } +/** + * Builds a {@link ResumableStreamAdapter} from a stream store and active-stream store. + * + * If `activeStreamStore` is not provided explicitly, it is inferred from `streamStore` + * when the store implements the {@link ResumableStreamActiveStore} interface. + * + * @param config - Must include `streamStore`; `activeStreamStore` is inferred when possible. + * @returns A fully wired {@link ResumableStreamAdapter}. + * @throws If `streamStore` is missing or `activeStreamStore` cannot be resolved. + */ export async function createResumableStreamAdapter( config: ResumableStreamAdapterConfig, ): Promise { @@ -628,6 +676,17 @@ export async function createResumableStreamAdapter( return adapter; } +/** + * Merges an existing resumable stream adapter into server-provider dependencies. + * + * If `deps` already contains a `resumableStream`, the provided adapter is ignored + * (with a warning). Disabled adapters are also filtered out. + * + * @param deps - Current server-provider dependencies. + * @param adapter - Optional adapter to inject. + * @param logger - Optional logger for warnings. + * @returns Updated dependencies with the resolved adapter, if any. + */ export async function resolveResumableStreamDeps( deps: ServerProviderDeps, adapter: ResumableStreamAdapter | undefined, @@ -651,6 +710,14 @@ export async function resolveResumableStreamDeps( }; } +/** + * Validates a {@link ResumableStreamAdapter}, returning `undefined` + * when the adapter is disabled or invalid. + * + * @param adapter - The adapter to validate. + * @param logger - Optional logger for warnings. + * @returns The validated adapter, or `undefined`. + */ export function resolveResumableStreamAdapter( adapter: ResumableStreamAdapter | undefined, logger?: Logger, From ee37043704d0a8cd6a505e441cb3872cff151e64 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Mon, 4 May 2026 11:10:46 -0500 Subject: [PATCH 21/22] fix(a2a-server): harden error handling, validation, and serialization - Mark task as failed and persist before rethrowing on non-abort errors in handleMessageSend so tasks don't stay stuck in "working" state - Replace hand-rolled checks in validateMessageSendParams with Zod schema validation (A2AMessageSchema) to reject malformed messages - Use strict undefined check in toJsonRpcError to preserve falsy data values (0, false, "") in serialized error details Signed-off-by: Riley Des --- packages/a2a-server/src/server.ts | 39 +++++++++++++++++++------------ packages/a2a-server/src/types.ts | 2 +- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/packages/a2a-server/src/server.ts b/packages/a2a-server/src/server.ts index 71f3ae8aa..bba82889d 100644 --- a/packages/a2a-server/src/server.ts +++ b/packages/a2a-server/src/server.ts @@ -3,6 +3,7 @@ import { type Agent, convertUsage, getGlobalLogger } from "@voltagent/core"; import { buildAgentCard } from "./adapters/agent"; import { fromVoltAgentMessage, toVoltAgentMessage } from "./adapters/message"; import { createSuccessResponse, normalizeError } from "./protocol"; +import { A2AMessageSchema } from "./schemas"; import { InMemoryTaskStore } from "./store"; import { appendMessage, @@ -335,6 +336,22 @@ export class A2AServer { if (abortController.signal.aborted) { return await this.ensureCanceledRecord(agentId, record); } + + const failureText = + error instanceof Error ? error.message : "Task failed with an unknown error"; + const failureMessage: A2AMessage = { + kind: "message", + role: "agent", + messageId: randomUUID(), + taskId: record.id, + contextId: record.contextId, + parts: [{ kind: "text", text: failureText }], + }; + + record = appendMessage(record, failureMessage); + record = transitionStatus(record, { state: "failed", message: failureMessage }); + await taskStore.save({ agentId, data: record }); + throw error; } finally { this.clearActiveOperation(agentId, record.id); @@ -695,24 +712,16 @@ export class A2AServer { if (!payload || typeof payload !== "object") { throw VoltA2AError.invalidParams("Params must be an object"); } - const candidate = payload as Partial; - - if (!candidate.message || typeof candidate.message !== "object") { - throw VoltA2AError.invalidParams("'message' must be provided"); - } + const candidate = payload as Record; - if (!Array.isArray(candidate.message.parts) || candidate.message.parts.length === 0) { - throw VoltA2AError.invalidParams("Message must include at least one part"); - } - - const hasInvalidPart = candidate.message.parts.some( - (part) => part.kind !== "text" || typeof part.text !== "string", - ); - if (hasInvalidPart) { - throw VoltA2AError.invalidParams("Only plain text message parts are supported"); + try { + A2AMessageSchema.parse(candidate.message); + } catch (error) { + const message = error instanceof Error ? error.message : "Invalid message payload"; + throw VoltA2AError.invalidParams(message); } - return candidate as MessageSendParams; + return candidate as unknown as MessageSendParams; } private validateTaskQueryParams(payload: unknown): TaskQueryParams { diff --git a/packages/a2a-server/src/types.ts b/packages/a2a-server/src/types.ts index 8980bc9ec..1bb9ed6c4 100644 --- a/packages/a2a-server/src/types.ts +++ b/packages/a2a-server/src/types.ts @@ -238,7 +238,7 @@ export class VoltA2AError extends Error { message: this.message, data: { taskId: this.taskId, - ...(this.data ? { details: this.data } : {}), + ...(this.data !== undefined ? { details: this.data } : {}), }, }; } From 49941b1efe0584f5cb5b4721ea49e9e88f3e6604 Mon Sep 17 00:00:00 2001 From: Riley Des Date: Mon, 4 May 2026 11:38:54 -0500 Subject: [PATCH 22/22] fix(a2a-server): add error handling for task failure persistence Signed-off-by: Riley Des --- packages/a2a-server/src/server.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/a2a-server/src/server.ts b/packages/a2a-server/src/server.ts index bba82889d..2172ae8b5 100644 --- a/packages/a2a-server/src/server.ts +++ b/packages/a2a-server/src/server.ts @@ -350,7 +350,18 @@ export class A2AServer { record = appendMessage(record, failureMessage); record = transitionStatus(record, { state: "failed", message: failureMessage }); - await taskStore.save({ agentId, data: record }); + + try { + await taskStore.save({ agentId, data: record }); + } catch (saveErr) { + getGlobalLogger() + .child({ component: "a2a-server" }) + .warn("Failed to persist task failure status", { + agentId, + taskId: record.id, + saveError: saveErr, + }); + } throw error; } finally {