From 1fb6016129a31b994023b05854525da93d78c14e Mon Sep 17 00:00:00 2001 From: Volodymyr Vreshch Date: Wed, 15 Apr 2026 22:59:37 +0200 Subject: [PATCH] =?UTF-8?q?feat(core):=20combinators=20MVP=20=E2=80=94=20s?= =?UTF-8?q?equence,=20parallel,=20map?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3. Pure functions over ctx.run — each combinator IS an Agent, so they recurse without limit. Runs in-process via makeCtxRun or through the daemon via runtime.dispatch transparently. Combinators (src/combinators/index.ts) - sequence(...steps) — run in order; if step's result.output is an object it becomes next step's input.config; halt on first failure - parallel(steps) — concurrent; tuple of outputs in input order; any child failure flips overall success=false, does NOT cancel siblings (caller decides via result) - map(items, factory) — fan-out; each child's input.config gains { item }; output is array in item order Helper (src/combinators/merge.ts) - mergeGenerators(gens): interleaves yielded T in arrival order, collects R[] returns in input order. Used by parallel/map to stream child events as they arrive while preserving result ordering. Accept StepRef = Agent | string; string refs resolve via ctx.run's registry (same rules as ctx.run itself). Tests (+16, total 123) - merge: multi-source yield + return collection, empty, fast/slow arrival ordering - sequence: empty, object output threads as config, halt on failure, string ref via registry, event forwarding across steps - parallel: empty, outputs in input order, partial failure, event interleaving - map: empty items, factory per-item, { item } injection into config, partial failure Exports: sequence, parallel, map, StepRef, MapFactory from @agentage/core. --- packages/core/src/combinators/index.test.ts | 168 ++++++++++++++++++++ packages/core/src/combinators/index.ts | 112 +++++++++++++ packages/core/src/combinators/merge.test.ts | 72 +++++++++ packages/core/src/combinators/merge.ts | 34 ++++ packages/core/src/index.ts | 4 + 5 files changed, 390 insertions(+) create mode 100644 packages/core/src/combinators/index.test.ts create mode 100644 packages/core/src/combinators/index.ts create mode 100644 packages/core/src/combinators/merge.test.ts create mode 100644 packages/core/src/combinators/merge.ts diff --git a/packages/core/src/combinators/index.test.ts b/packages/core/src/combinators/index.test.ts new file mode 100644 index 0000000..48c0c73 --- /dev/null +++ b/packages/core/src/combinators/index.test.ts @@ -0,0 +1,168 @@ +import { describe, it, expect } from 'vitest'; +import { map, parallel, sequence } from './index.js'; +import { collectEvents, makeFakeAgent, mockRegistry } from '../testing/index.js'; +import { output as outputEvent } from '../events.js'; + +describe('sequence', () => { + it('returns success with undefined output for empty', async () => { + const s = sequence(); + const { result } = await collectEvents(s); + expect(result).toEqual({ success: true, output: undefined }); + }); + + it('runs steps in order, threads object output as config', async () => { + const seen: Array | undefined> = []; + const stepA = makeFakeAgent({ + name: 'a', + output: { fromA: 1 }, + }); + const stepB = makeFakeAgent({ + name: 'b', + output: { fromB: 2 }, + }); + // Wrap b to record its input.config + const bWithCapture = { + ...stepB, + run: async ( + input: Parameters[0], + runtime: Parameters[1] + ) => { + seen.push(input.config); + return stepB.run(input, runtime); + }, + }; + const s = sequence(stepA, bWithCapture); + const { result } = await collectEvents(s); + expect(result?.success).toBe(true); + expect(result?.output).toEqual({ fromB: 2 }); + expect(seen[0]).toEqual({ fromA: 1 }); + }); + + it('halts on first failing step; returns its output', async () => { + const stepA = makeFakeAgent({ name: 'a', fail: 'boom' }); + let bCalled = false; + const stepB = { + ...makeFakeAgent({ name: 'b' }), + run: async (input: RunInput, runtime: Parameters[1]) => { + bCalled = true; + return makeFakeAgent({ name: 'b' }).run(input, runtime); + }, + } as unknown as ReturnType; + const s = sequence(stepA, stepB); + const { result } = await collectEvents(s); + expect(result?.success).toBe(false); + expect(bCalled).toBe(false); + }); + + it('resolves steps by string ref via registry', async () => { + const stepA = makeFakeAgent({ name: 'a', output: { x: 1 } }); + const stepB = makeFakeAgent({ name: 'b', output: { y: 2 } }); + const registry = mockRegistry({ a: stepA, b: stepB }); + + const s = sequence('a', 'b'); + const { result } = await collectEvents(s, { task: '' }, { registry }); + expect(result?.success).toBe(true); + expect(result?.output).toEqual({ y: 2 }); + }); + + it('forwards events from all steps', async () => { + const stepA = makeFakeAgent({ name: 'a', events: [outputEvent('a-evt')] }); + const stepB = makeFakeAgent({ name: 'b', events: [outputEvent('b-evt')] }); + const s = sequence(stepA, stepB); + const { events } = await collectEvents(s); + const texts = events + .filter((e) => e.data.type === 'output') + .map((e) => (e.data as { content: unknown }).content); + expect(texts).toContain('a-evt'); + expect(texts).toContain('b-evt'); + }); +}); + +describe('parallel', () => { + it('returns [] for empty steps', async () => { + const p = parallel([]); + const { result } = await collectEvents(p); + expect(result).toEqual({ success: true, output: [] }); + }); + + it('runs all steps; returns outputs in input order', async () => { + const p = parallel([ + makeFakeAgent({ name: 'a', output: 'A' }), + makeFakeAgent({ name: 'b', output: 'B' }), + makeFakeAgent({ name: 'c', output: 'C' }), + ]); + const { result } = await collectEvents(p); + expect(result?.success).toBe(true); + expect(result?.output).toEqual(['A', 'B', 'C']); + }); + + it('flips success to false when any child fails, still collects all', async () => { + const p = parallel([ + makeFakeAgent({ name: 'a', output: 'A' }), + makeFakeAgent({ name: 'b', fail: 'b-err' }), + makeFakeAgent({ name: 'c', output: 'C' }), + ]); + const { result } = await collectEvents(p); + expect(result?.success).toBe(false); + expect((result?.output as unknown[]).length).toBe(3); + }); + + it('interleaves events from concurrent children', async () => { + const p = parallel([ + makeFakeAgent({ name: 'a', events: [outputEvent('a-evt')] }), + makeFakeAgent({ name: 'b', events: [outputEvent('b-evt')] }), + ]); + const { events } = await collectEvents(p); + const texts = events + .filter((e) => e.data.type === 'output') + .map((e) => (e.data as { content: unknown }).content); + expect(texts).toContain('a-evt'); + expect(texts).toContain('b-evt'); + }); +}); + +describe('map', () => { + it('returns [] for empty items', async () => { + const m = map([], () => makeFakeAgent({ name: 'x' })); + const { result } = await collectEvents(m); + expect(result).toEqual({ success: true, output: [] }); + }); + + it('runs factory per item and collects outputs in order', async () => { + const items = [10, 20, 30]; + const m = map(items, (item) => makeFakeAgent({ name: `x${item}`, output: item * 2 })); + const { result } = await collectEvents(m); + expect(result?.success).toBe(true); + expect(result?.output).toEqual([20, 40, 60]); + }); + + it('passes { item } as config.item to each child', async () => { + const seen: unknown[] = []; + const m = map([1, 2], (item) => { + const a = makeFakeAgent({ name: `x${item}`, output: item }); + return { + ...a, + run: async (input: Parameters[0], runtime: Parameters[1]) => { + seen.push((input.config as { item: unknown } | undefined)?.item); + return a.run(input, runtime); + }, + }; + }); + await collectEvents(m); + expect(seen.sort()).toEqual([1, 2]); + }); + + it('flips success to false when any child fails', async () => { + const m = map([1, 2], (i) => + i === 2 ? makeFakeAgent({ name: 'x', fail: 'bad' }) : makeFakeAgent({ name: 'x', output: i }) + ); + const { result } = await collectEvents(m); + expect(result?.success).toBe(false); + expect((result?.output as unknown[]).length).toBe(2); + }); +}); + +// Import types referenced above (pulled up here so the file reads top-to-bottom) +import type { Agent, RunInput } from '../types.js'; +void ({} as Agent); +void ({} as RunInput); diff --git a/packages/core/src/combinators/index.ts b/packages/core/src/combinators/index.ts new file mode 100644 index 0000000..376319e --- /dev/null +++ b/packages/core/src/combinators/index.ts @@ -0,0 +1,112 @@ +/** + * Composition combinators — each returns an Agent so they recurse without limit. + * + * sequence(a, b, c) — run a, b, c in order; thread output.config forward + * parallel([a, b, c]) — run concurrently; tuple of results; fail-fast + * map(items, factory) — fan-out; one child per item; array of results + * + * All three are pure functions over ctx.run — the combinator IS an Agent + * whose run() invokes ctx.run for each step. Runs in-process (via default + * makeCtxRun) or through the daemon (via runtime.dispatch) transparently. + */ +import { agent } from '../agent.js'; +import { result as resultEvent } from '../events.js'; +import type { Agent, CtxRunResult, RunInput } from '../types.js'; +import { mergeGenerators } from './merge.js'; + +export type StepRef = Agent | string; + +/** Input is threaded forward: if a step's output is an object, it replaces `config`. */ +const threadInput = (base: RunInput, previousOutput: unknown): RunInput => { + if (previousOutput === undefined || previousOutput === null) return base; + if (typeof previousOutput === 'object' && !Array.isArray(previousOutput)) { + return { ...base, config: previousOutput as Record }; + } + return base; +}; + +/** + * Run `steps` one after another. Each step's `result.output` (if an object) + * becomes the next step's `input.config`. Halts on first failure and yields + * a failing result. The overall `output` is the final step's output. + */ +export const sequence = (...steps: StepRef[]): Agent => + agent({ + name: 'sequence', + description: `sequence(${steps.length} steps)`, + async *run(input, ctx) { + if (steps.length === 0) { + yield resultEvent(true); + return; + } + let carry: unknown; + let last: CtxRunResult = { success: true }; + for (let i = 0; i < steps.length; i++) { + const step = steps[i]; + const stepInput = i === 0 ? input : threadInput(input, carry); + last = yield* ctx.run(step, stepInput); + if (!last.success) { + yield resultEvent(false, last.output); + return; + } + carry = last.output; + } + yield resultEvent(true, last.output); + }, + }); + +/** + * Run `steps` concurrently with the same input. Events interleave as they + * arrive (via mergeGenerators). The overall result is an array of child + * outputs in input order; any child failure flips overall success to false + * but does NOT cancel the other children — caller decides via the result. + */ +export const parallel = (steps: StepRef[]): Agent => + agent({ + name: 'parallel', + description: `parallel(${steps.length} steps)`, + async *run(input, ctx) { + if (steps.length === 0) { + yield resultEvent(true, []); + return; + } + const gens = steps.map((step) => ctx.run(step, input)); + const childResults = yield* mergeGenerators(gens); + const outputs = childResults.map((r) => r.output); + const allSucceeded = childResults.every((r) => r.success); + yield resultEvent(allSucceeded, outputs); + }, + }); + +export type MapFactory = (item: T, index: number) => StepRef; + +/** + * Fan-out: for each item, resolve the step (via factory) and run it + * concurrently. Each child's input is the parent's `task` with a `config` + * object of `{ item }` (merged over parent input.config) — the agent reads + * `input.config.item` to get its slice. + * + * Overall output is an array of child outputs in item order. + */ +export const map = (items: T[], factory: MapFactory): Agent => + agent({ + name: 'map', + description: `map(${items.length} items)`, + async *run(input, ctx) { + if (items.length === 0) { + yield resultEvent(true, []); + return; + } + const gens = items.map((item, index) => { + const childInput: RunInput = { + ...input, + config: { ...(input.config ?? {}), item }, + }; + return ctx.run(factory(item, index), childInput); + }); + const childResults = yield* mergeGenerators(gens); + const outputs = childResults.map((r) => r.output); + const allSucceeded = childResults.every((r) => r.success); + yield resultEvent(allSucceeded, outputs); + }, + }); diff --git a/packages/core/src/combinators/merge.test.ts b/packages/core/src/combinators/merge.test.ts new file mode 100644 index 0000000..76b450d --- /dev/null +++ b/packages/core/src/combinators/merge.test.ts @@ -0,0 +1,72 @@ +import { describe, it, expect } from 'vitest'; +import { mergeGenerators } from './merge.js'; + +async function* sourceA(): AsyncGenerator { + yield 'a1'; + yield 'a2'; + return 1; +} +async function* sourceB(): AsyncGenerator { + yield 'b1'; + return 2; +} + +const waitFor = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +async function* slowSource( + label: string, + count: number, + delay: number +): AsyncGenerator { + for (let i = 0; i < count; i++) { + await waitFor(delay); + yield `${label}${i}`; + } + return count; +} + +describe('mergeGenerators', () => { + it('yields values from all generators and collects returns in input order', async () => { + const merged = mergeGenerators([sourceA(), sourceB()]); + const seen: string[] = []; + let done: number[] | undefined; + while (true) { + const next = await merged.next(); + if (next.done) { + done = next.value; + break; + } + seen.push(next.value); + } + expect(seen.sort()).toEqual(['a1', 'a2', 'b1']); + expect(done).toEqual([1, 2]); + }); + + it('empty input -> empty returns, no yields', async () => { + const merged = mergeGenerators([]); + const next = await merged.next(); + expect(next.done).toBe(true); + if (next.done) expect(next.value).toEqual([]); + }); + + it('interleaves fast and slow generators (arrival order)', async () => { + // fast yields twice rapidly; slow yields after a pause + const fast = (async function* (): AsyncGenerator { + yield 'f1'; + yield 'f2'; + return 1; + })(); + const slow = slowSource('s', 2, 20); + + const merged = mergeGenerators([fast, slow]); + const seen: string[] = []; + while (true) { + const next = await merged.next(); + if (next.done) break; + seen.push(next.value); + } + // fast yields should appear before slow yields + expect(seen.indexOf('f1')).toBeLessThan(seen.indexOf('s0')); + expect(seen.indexOf('f2')).toBeLessThan(seen.indexOf('s1')); + }); +}); diff --git a/packages/core/src/combinators/merge.ts b/packages/core/src/combinators/merge.ts new file mode 100644 index 0000000..318d91b --- /dev/null +++ b/packages/core/src/combinators/merge.ts @@ -0,0 +1,34 @@ +/** + * Merge N async generators that yield T and return R — interleaves yielded + * values in arrival order, collects returns into an R[] at the end. + * + * Used by `parallel` to stream child events as they arrive while preserving + * ordered access to each child's final CtxRunResult. + */ +export async function* mergeGenerators( + gens: Array> +): AsyncGenerator { + const pending = new Map }>>(); + gens.forEach((g, i) => + pending.set( + i, + g.next().then((res) => ({ i, res })) + ) + ); + + const results: R[] = new Array(gens.length); + while (pending.size > 0) { + const { i, res } = await Promise.race([...pending.values()]); + if (res.done) { + results[i] = res.value; + pending.delete(i); + } else { + yield res.value; + pending.set( + i, + gens[i].next().then((r) => ({ i, res: r })) + ); + } + } + return results; +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 5519721..9db1d56 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -31,6 +31,10 @@ export { isTerminal, canTransition } from './state-machine.js'; // Agent builder export { agent, makeCtxRun, DEFAULT_DEPTH_LIMIT } from './agent.js'; +// Composition combinators +export { sequence, parallel, map } from './combinators/index.js'; +export type { StepRef, MapFactory } from './combinators/index.js'; + // Event helpers export { output, progress, error, result } from './events.js';