Skip to content

Commit 92c0e40

Browse files
committed
Fix worker error cleanup and skip subprocess polling when idle
- CoalescingDrainableWorker: Add Effect.onError handler to remove key from activeKeys when process fails, preventing drainKey/drain from hanging forever - Terminal Manager: Skip subprocess polling when no running sessions exist, avoiding unnecessary subprocess checker invocations during idle periods
1 parent 0a77ebc commit 92c0e40

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

apps/server/src/terminal/Layers/Manager.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1377,6 +1377,10 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith
13771377
session.status === "running" && Number.isInteger(session.pid),
13781378
);
13791379

1380+
if (runningSessions.length === 0) {
1381+
return;
1382+
}
1383+
13801384
yield* Effect.forEach(
13811385
runningSessions,
13821386
(session) =>
@@ -1434,8 +1438,20 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith
14341438
);
14351439
});
14361440

1441+
const hasRunningSessions = readManagerState.pipe(
1442+
Effect.map((state) => [...state.sessions.values()].some((s) => s.status === "running")),
1443+
);
1444+
14371445
yield* Effect.forever(
1438-
Effect.sleep(subprocessPollIntervalMs).pipe(Effect.flatMap(() => pollSubprocessActivity())),
1446+
hasRunningSessions.pipe(
1447+
Effect.flatMap((active) =>
1448+
active
1449+
? pollSubprocessActivity().pipe(
1450+
Effect.flatMap(() => Effect.sleep(subprocessPollIntervalMs)),
1451+
)
1452+
: Effect.sleep(subprocessPollIntervalMs),
1453+
),
1454+
),
14391455
).pipe(Effect.forkIn(workerScope));
14401456

14411457
yield* Effect.addFinalizer(() =>

packages/shared/src/CoalescingDrainableWorker.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,17 @@ export const makeCoalescingDrainableWorker = <K, V, E, R>(options: {
7777
] as const;
7878
}).pipe(Effect.tx),
7979
),
80-
Effect.flatMap((item) => (item === null ? Effect.void : processKey(item.key, item.value))),
80+
Effect.flatMap((item) =>
81+
item === null
82+
? Effect.void
83+
: Effect.onError(processKey(item.key, item.value), () =>
84+
TxRef.update(stateRef, (state) => {
85+
const activeKeys = new Set(state.activeKeys);
86+
activeKeys.delete(item.key);
87+
return { ...state, activeKeys };
88+
}),
89+
),
90+
),
8191
Effect.forever,
8292
Effect.forkScoped,
8393
);

0 commit comments

Comments
 (0)