Skip to content

Commit eb80e70

Browse files
committed
Fix PubSub subscription race in streamStatus and restore cache invalidation on mutation failure
- Subscribe to PubSub before fetching initial status in streamStatus to close the race window where published changes could be missed between getStatus resolving and the PubSub subscription being established. - Change onSuccess to onSettled for gitRunStackedActionMutationOptions and gitPullMutationOptions so branch query cache is invalidated even when mutations partially fail. - Add refreshGitStatus to the failure path of gitRunStackedAction and use Effect.ensuring for gitPull so server-side git status broadcast fires regardless of success or failure.
1 parent e8503a9 commit eb80e70

File tree

3 files changed

+8
-5
lines changed

3 files changed

+8
-5
lines changed

apps/server/src/git/Layers/GitStatusBroadcaster.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,13 @@ export const GitStatusBroadcasterLive = Layer.effect(
127127
Effect.gen(function* () {
128128
const normalizedCwd = normalizeCwd(input.cwd);
129129
yield* ensurePoller(normalizedCwd);
130+
131+
const subscription = yield* PubSub.subscribe(changesPubSub);
130132
const initialStatus = yield* getStatus({ cwd: normalizedCwd });
131133

132134
return Stream.concat(
133135
Stream.make(initialStatus),
134-
Stream.fromPubSub(changesPubSub).pipe(
136+
Stream.fromEffectRepeat(PubSub.take(subscription)).pipe(
135137
Stream.filter((event) => event.cwd === normalizedCwd),
136138
Stream.map((event) => event.status),
137139
),

apps/server/src/ws.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
573573
[WS_METHODS.gitPull]: (input) =>
574574
observeRpcEffect(
575575
WS_METHODS.gitPull,
576-
git.pullCurrentBranch(input.cwd).pipe(Effect.tap(() => refreshGitStatus(input.cwd))),
576+
git.pullCurrentBranch(input.cwd).pipe(Effect.ensuring(refreshGitStatus(input.cwd))),
577577
{ "rpc.aggregate": "git" },
578578
),
579579
[WS_METHODS.gitRunStackedAction]: (input) =>
@@ -589,7 +589,8 @@ const WsRpcLayer = WsRpcGroup.toLayer(
589589
})
590590
.pipe(
591591
Effect.matchCauseEffect({
592-
onFailure: (cause) => Queue.failCause(queue, cause),
592+
onFailure: (cause) =>
593+
refreshGitStatus(input.cwd).pipe(Effect.andThen(Queue.failCause(queue, cause))),
593594
onSuccess: () =>
594595
refreshGitStatus(input.cwd).pipe(
595596
Effect.andThen(Queue.end(queue).pipe(Effect.asVoid)),

apps/web/src/lib/gitReactQuery.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ export function gitRunStackedActionMutationOptions(input: {
163163
...(onProgress ? [{ onProgress }] : []),
164164
);
165165
},
166-
onSuccess: async () => {
166+
onSettled: async () => {
167167
await invalidateGitBranchQueries(input.queryClient, input.cwd);
168168
},
169169
});
@@ -177,7 +177,7 @@ export function gitPullMutationOptions(input: { cwd: string | null; queryClient:
177177
if (!input.cwd) throw new Error("Git pull is unavailable.");
178178
return api.git.pull({ cwd: input.cwd });
179179
},
180-
onSuccess: async () => {
180+
onSettled: async () => {
181181
await invalidateGitBranchQueries(input.queryClient, input.cwd);
182182
},
183183
});

0 commit comments

Comments
 (0)