Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ export const makeOrchestrationIntegrationHarness = (

const scope = yield* Scope.make("sequential");
yield* tryRuntimePromise("start OrchestrationReactor", () =>
runtime.runPromise(reactor.start.pipe(Scope.provide(scope))),
runtime.runPromise(reactor.start().pipe(Scope.provide(scope))),
).pipe(Effect.orDie);
const receiptHistory = yield* Ref.make<ReadonlyArray<OrchestrationRuntimeReceipt>>([]);
yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ describe("CheckpointReactor", () => {
const reactor = await runtime.runPromise(Effect.service(CheckpointReactor));
const checkpointStore = await runtime.runPromise(Effect.service(CheckpointStore));
scope = await Effect.runPromise(Scope.make("sequential"));
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));
await Effect.runPromise(reactor.start().pipe(Scope.provide(scope)));
const drain = () => Effect.runPromise(reactor.drain);

const createdAt = new Date().toISOString();
Expand Down
2 changes: 1 addition & 1 deletion apps/server/src/orchestration/Layers/CheckpointReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ const make = Effect.gen(function* () {

const worker = yield* makeDrainableWorker(processInputSafely);

const start: CheckpointReactorShape["start"] = Effect.gen(function* () {
const start: CheckpointReactorShape["start"] = Effect.fn("start")(function* () {
yield* Effect.forkScoped(
Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => {
if (
Expand Down
17 changes: 10 additions & 7 deletions apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,28 @@ describe("OrchestrationReactor", () => {
Layer.effect(OrchestrationReactor, makeOrchestrationReactor).pipe(
Layer.provideMerge(
Layer.succeed(ProviderRuntimeIngestionService, {
start: Effect.sync(() => {
start: () => {
started.push("provider-runtime-ingestion");
}),
return Effect.void;
},
drain: Effect.void,
}),
),
Layer.provideMerge(
Layer.succeed(ProviderCommandReactor, {
start: Effect.sync(() => {
start: () => {
started.push("provider-command-reactor");
}),
return Effect.void;
},
drain: Effect.void,
}),
),
Layer.provideMerge(
Layer.succeed(CheckpointReactor, {
start: Effect.sync(() => {
start: () => {
started.push("checkpoint-reactor");
}),
return Effect.void;
},
drain: Effect.void,
}),
),
Expand All @@ -51,7 +54,7 @@ describe("OrchestrationReactor", () => {

const reactor = await runtime.runPromise(Effect.service(OrchestrationReactor));
const scope = await Effect.runPromise(Scope.make("sequential"));
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));
await Effect.runPromise(reactor.start().pipe(Scope.provide(scope)));

expect(started).toEqual([
"provider-runtime-ingestion",
Expand Down
8 changes: 4 additions & 4 deletions apps/server/src/orchestration/Layers/OrchestrationReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ export const makeOrchestrationReactor = Effect.gen(function* () {
const providerCommandReactor = yield* ProviderCommandReactor;
const checkpointReactor = yield* CheckpointReactor;

const start: OrchestrationReactorShape["start"] = Effect.gen(function* () {
yield* providerRuntimeIngestion.start;
yield* providerCommandReactor.start;
yield* checkpointReactor.start;
const start: OrchestrationReactorShape["start"] = Effect.fn("start")(function* () {
yield* providerRuntimeIngestion.start();
yield* providerCommandReactor.start();
yield* checkpointReactor.start();
});

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ describe("ProviderCommandReactor", () => {
const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
const reactor = await runtime.runPromise(Effect.service(ProviderCommandReactor));
scope = await Effect.runPromise(Scope.make("sequential"));
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));
await Effect.runPromise(reactor.start().pipe(Scope.provide(scope)));
const drain = () => Effect.runPromise(reactor.drain);

await Effect.runPromise(
Expand Down
26 changes: 14 additions & 12 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -773,22 +773,24 @@ const make = Effect.gen(function* () {

const worker = yield* makeDrainableWorker(processDomainEventSafely);

const start: ProviderCommandReactorShape["start"] = Effect.forkScoped(
Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => {
const start: ProviderCommandReactorShape["start"] = Effect.fn("start")(function* () {
const processEvent = Effect.fn("processEvent")(function* (event: OrchestrationEvent) {
if (
event.type !== "thread.runtime-mode-set" &&
event.type !== "thread.turn-start-requested" &&
event.type !== "thread.turn-interrupt-requested" &&
event.type !== "thread.approval-response-requested" &&
event.type !== "thread.user-input-response-requested" &&
event.type !== "thread.session-stop-requested"
event.type === "thread.runtime-mode-set" ||
event.type === "thread.turn-start-requested" ||
event.type === "thread.turn-interrupt-requested" ||
event.type === "thread.approval-response-requested" ||
event.type === "thread.user-input-response-requested" ||
event.type === "thread.session-stop-requested"
) {
return Effect.void;
return yield* worker.enqueue(event);
}
});

return worker.enqueue(event);
}),
).pipe(Effect.asVoid);
yield* Effect.forkScoped(
Stream.runForEach(orchestrationEngine.streamDomainEvents, processEvent),
);
});

return {
start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ describe("ProviderRuntimeIngestion", () => {
const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
const ingestion = await runtime.runPromise(Effect.service(ProviderRuntimeIngestionService));
scope = await Effect.runPromise(Scope.make("sequential"));
await Effect.runPromise(ingestion.start.pipe(Scope.provide(scope)));
await Effect.runPromise(ingestion.start().pipe(Scope.provide(scope)));
const drain = () => Effect.runPromise(ingestion.drain);

const createdAt = new Date().toISOString();
Expand Down
Loading
Loading