Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions packages/core/src/combinators/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import { describe, it, expect } from 'vitest';
import { map, parallel, sequence } from './index.js';
import { collectEvents, makeFakeAgent, mockRegistry } from '../testing/index.js';
import { output as outputEvent } from '../events.js';

describe('sequence', () => {
it('returns success with undefined output for empty', async () => {
const s = sequence();
const { result } = await collectEvents(s);
expect(result).toEqual({ success: true, output: undefined });
});

it('runs steps in order, threads object output as config', async () => {
const seen: Array<Record<string, unknown> | undefined> = [];
const stepA = makeFakeAgent({
name: 'a',
output: { fromA: 1 },
});
const stepB = makeFakeAgent({
name: 'b',
output: { fromB: 2 },
});
// Wrap b to record its input.config
const bWithCapture = {
...stepB,
run: async (
input: Parameters<typeof stepB.run>[0],
runtime: Parameters<typeof stepB.run>[1]
) => {
seen.push(input.config);
return stepB.run(input, runtime);
},
};
const s = sequence(stepA, bWithCapture);
const { result } = await collectEvents(s);
expect(result?.success).toBe(true);
expect(result?.output).toEqual({ fromB: 2 });
expect(seen[0]).toEqual({ fromA: 1 });
});

it('halts on first failing step; returns its output', async () => {
const stepA = makeFakeAgent({ name: 'a', fail: 'boom' });
let bCalled = false;
const stepB = {
...makeFakeAgent({ name: 'b' }),
run: async (input: RunInput, runtime: Parameters<Agent['run']>[1]) => {
bCalled = true;
return makeFakeAgent({ name: 'b' }).run(input, runtime);
},
} as unknown as ReturnType<typeof makeFakeAgent>;
const s = sequence(stepA, stepB);
const { result } = await collectEvents(s);
expect(result?.success).toBe(false);
expect(bCalled).toBe(false);
});

it('resolves steps by string ref via registry', async () => {
const stepA = makeFakeAgent({ name: 'a', output: { x: 1 } });
const stepB = makeFakeAgent({ name: 'b', output: { y: 2 } });
const registry = mockRegistry({ a: stepA, b: stepB });

const s = sequence('a', 'b');
const { result } = await collectEvents(s, { task: '' }, { registry });
expect(result?.success).toBe(true);
expect(result?.output).toEqual({ y: 2 });
});

it('forwards events from all steps', async () => {
const stepA = makeFakeAgent({ name: 'a', events: [outputEvent('a-evt')] });
const stepB = makeFakeAgent({ name: 'b', events: [outputEvent('b-evt')] });
const s = sequence(stepA, stepB);
const { events } = await collectEvents(s);
const texts = events
.filter((e) => e.data.type === 'output')
.map((e) => (e.data as { content: unknown }).content);
expect(texts).toContain('a-evt');
expect(texts).toContain('b-evt');
});
});

describe('parallel', () => {
it('returns [] for empty steps', async () => {
const p = parallel([]);
const { result } = await collectEvents(p);
expect(result).toEqual({ success: true, output: [] });
});

it('runs all steps; returns outputs in input order', async () => {
const p = parallel([
makeFakeAgent({ name: 'a', output: 'A' }),
makeFakeAgent({ name: 'b', output: 'B' }),
makeFakeAgent({ name: 'c', output: 'C' }),
]);
const { result } = await collectEvents(p);
expect(result?.success).toBe(true);
expect(result?.output).toEqual(['A', 'B', 'C']);
});

it('flips success to false when any child fails, still collects all', async () => {
const p = parallel([
makeFakeAgent({ name: 'a', output: 'A' }),
makeFakeAgent({ name: 'b', fail: 'b-err' }),
makeFakeAgent({ name: 'c', output: 'C' }),
]);
const { result } = await collectEvents(p);
expect(result?.success).toBe(false);
expect((result?.output as unknown[]).length).toBe(3);
});

it('interleaves events from concurrent children', async () => {
const p = parallel([
makeFakeAgent({ name: 'a', events: [outputEvent('a-evt')] }),
makeFakeAgent({ name: 'b', events: [outputEvent('b-evt')] }),
]);
const { events } = await collectEvents(p);
const texts = events
.filter((e) => e.data.type === 'output')
.map((e) => (e.data as { content: unknown }).content);
expect(texts).toContain('a-evt');
expect(texts).toContain('b-evt');
});
});

describe('map', () => {
it('returns [] for empty items', async () => {
const m = map<number>([], () => makeFakeAgent({ name: 'x' }));
const { result } = await collectEvents(m);
expect(result).toEqual({ success: true, output: [] });
});

it('runs factory per item and collects outputs in order', async () => {
const items = [10, 20, 30];
const m = map(items, (item) => makeFakeAgent({ name: `x${item}`, output: item * 2 }));
const { result } = await collectEvents(m);
expect(result?.success).toBe(true);
expect(result?.output).toEqual([20, 40, 60]);
});

it('passes { item } as config.item to each child', async () => {
const seen: unknown[] = [];
const m = map([1, 2], (item) => {
const a = makeFakeAgent({ name: `x${item}`, output: item });
return {
...a,
run: async (input: Parameters<typeof a.run>[0], runtime: Parameters<typeof a.run>[1]) => {
seen.push((input.config as { item: unknown } | undefined)?.item);
return a.run(input, runtime);
},
};
});
await collectEvents(m);
expect(seen.sort()).toEqual([1, 2]);
});

it('flips success to false when any child fails', async () => {
const m = map([1, 2], (i) =>
i === 2 ? makeFakeAgent({ name: 'x', fail: 'bad' }) : makeFakeAgent({ name: 'x', output: i })
);
const { result } = await collectEvents(m);
expect(result?.success).toBe(false);
expect((result?.output as unknown[]).length).toBe(2);
});
});

// Import types referenced above (pulled up here so the file reads top-to-bottom)
import type { Agent, RunInput } from '../types.js';
void ({} as Agent);
void ({} as RunInput);
112 changes: 112 additions & 0 deletions packages/core/src/combinators/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Composition combinators — each returns an Agent so they recurse without limit.
*
* sequence(a, b, c) — run a, b, c in order; thread output.config forward
* parallel([a, b, c]) — run concurrently; tuple of results; fail-fast
* map(items, factory) — fan-out; one child per item; array of results
*
* All three are pure functions over ctx.run — the combinator IS an Agent
* whose run() invokes ctx.run for each step. Runs in-process (via default
* makeCtxRun) or through the daemon (via runtime.dispatch) transparently.
*/
import { agent } from '../agent.js';
import { result as resultEvent } from '../events.js';
import type { Agent, CtxRunResult, RunInput } from '../types.js';
import { mergeGenerators } from './merge.js';

export type StepRef = Agent | string;

/** Input is threaded forward: if a step's output is an object, it replaces `config`. */
const threadInput = (base: RunInput, previousOutput: unknown): RunInput => {
if (previousOutput === undefined || previousOutput === null) return base;
if (typeof previousOutput === 'object' && !Array.isArray(previousOutput)) {
return { ...base, config: previousOutput as Record<string, unknown> };
}
return base;
};

/**
* Run `steps` one after another. Each step's `result.output` (if an object)
* becomes the next step's `input.config`. Halts on first failure and yields
* a failing result. The overall `output` is the final step's output.
*/
export const sequence = (...steps: StepRef[]): Agent =>
agent({
name: 'sequence',
description: `sequence(${steps.length} steps)`,
async *run(input, ctx) {
if (steps.length === 0) {
yield resultEvent(true);
return;
}
let carry: unknown;
let last: CtxRunResult = { success: true };
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
const stepInput = i === 0 ? input : threadInput(input, carry);
last = yield* ctx.run(step, stepInput);
if (!last.success) {
yield resultEvent(false, last.output);
return;
}
carry = last.output;
}
yield resultEvent(true, last.output);
},
});

/**
* Run `steps` concurrently with the same input. Events interleave as they
* arrive (via mergeGenerators). The overall result is an array of child
* outputs in input order; any child failure flips overall success to false
* but does NOT cancel the other children — caller decides via the result.
*/
export const parallel = (steps: StepRef[]): Agent =>
agent({
name: 'parallel',
description: `parallel(${steps.length} steps)`,
async *run(input, ctx) {
if (steps.length === 0) {
yield resultEvent(true, []);
return;
}
const gens = steps.map((step) => ctx.run(step, input));
const childResults = yield* mergeGenerators(gens);
const outputs = childResults.map((r) => r.output);
const allSucceeded = childResults.every((r) => r.success);
yield resultEvent(allSucceeded, outputs);
},
});

export type MapFactory<T> = (item: T, index: number) => StepRef;

/**
* Fan-out: for each item, resolve the step (via factory) and run it
* concurrently. Each child's input is the parent's `task` with a `config`
* object of `{ item }` (merged over parent input.config) — the agent reads
* `input.config.item` to get its slice.
*
* Overall output is an array of child outputs in item order.
*/
export const map = <T>(items: T[], factory: MapFactory<T>): Agent =>
agent({
name: 'map',
description: `map(${items.length} items)`,
async *run(input, ctx) {
if (items.length === 0) {
yield resultEvent(true, []);
return;
}
const gens = items.map((item, index) => {
const childInput: RunInput = {
...input,
config: { ...(input.config ?? {}), item },
};
return ctx.run(factory(item, index), childInput);
});
const childResults = yield* mergeGenerators(gens);
const outputs = childResults.map((r) => r.output);
const allSucceeded = childResults.every((r) => r.success);
yield resultEvent(allSucceeded, outputs);
},
});
72 changes: 72 additions & 0 deletions packages/core/src/combinators/merge.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { describe, it, expect } from 'vitest';
import { mergeGenerators } from './merge.js';

async function* sourceA(): AsyncGenerator<string, number, void> {
yield 'a1';
yield 'a2';
return 1;
}
async function* sourceB(): AsyncGenerator<string, number, void> {
yield 'b1';
return 2;
}

const waitFor = (ms: number) => new Promise<void>((r) => setTimeout(r, ms));

async function* slowSource(
label: string,
count: number,
delay: number
): AsyncGenerator<string, number, void> {
for (let i = 0; i < count; i++) {
await waitFor(delay);
yield `${label}${i}`;
}
return count;
}

describe('mergeGenerators', () => {
it('yields values from all generators and collects returns in input order', async () => {
const merged = mergeGenerators([sourceA(), sourceB()]);
const seen: string[] = [];
let done: number[] | undefined;
while (true) {
const next = await merged.next();
if (next.done) {
done = next.value;
break;
}
seen.push(next.value);
}
expect(seen.sort()).toEqual(['a1', 'a2', 'b1']);
expect(done).toEqual([1, 2]);
});

it('empty input -> empty returns, no yields', async () => {
const merged = mergeGenerators<string, number>([]);
const next = await merged.next();
expect(next.done).toBe(true);
if (next.done) expect(next.value).toEqual([]);
});

it('interleaves fast and slow generators (arrival order)', async () => {
// fast yields twice rapidly; slow yields after a pause
const fast = (async function* (): AsyncGenerator<string, number, void> {
yield 'f1';
yield 'f2';
return 1;
})();
const slow = slowSource('s', 2, 20);

const merged = mergeGenerators<string, number>([fast, slow]);
const seen: string[] = [];
while (true) {
const next = await merged.next();
if (next.done) break;
seen.push(next.value);
}
// fast yields should appear before slow yields
expect(seen.indexOf('f1')).toBeLessThan(seen.indexOf('s0'));
expect(seen.indexOf('f2')).toBeLessThan(seen.indexOf('s1'));
});
});
34 changes: 34 additions & 0 deletions packages/core/src/combinators/merge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Merge N async generators that yield T and return R — interleaves yielded
* values in arrival order, collects returns into an R[] at the end.
*
* Used by `parallel` to stream child events as they arrive while preserving
* ordered access to each child's final CtxRunResult.
*/
export async function* mergeGenerators<T, R>(
gens: Array<AsyncGenerator<T, R, void>>
): AsyncGenerator<T, R[], void> {
const pending = new Map<number, Promise<{ i: number; res: IteratorResult<T, R> }>>();
gens.forEach((g, i) =>
pending.set(
i,
g.next().then((res) => ({ i, res }))
)
);

const results: R[] = new Array(gens.length);
while (pending.size > 0) {
const { i, res } = await Promise.race([...pending.values()]);
if (res.done) {
results[i] = res.value;
pending.delete(i);
} else {
yield res.value;
pending.set(
i,
gens[i].next().then((r) => ({ i, res: r }))
);
}
}
return results;
}
Loading
Loading