Skip to content

Commit 6cb70ca

Browse files
committed
Fix metrics outcome always success and remove redundant RPC wrappers
- Move withMetrics before Effect.catch in OrchestrationEngine so that command failures are properly observed by the counter/timer metrics - Remove trivial rpcEffect/rpcStream/rpcStreamEffect pass-through wrappers in ws.ts and call observeRpc* functions directly
1 parent 0672418 commit 6cb70ca

File tree

2 files changed

+51
-65
lines changed

2 files changed

+51
-65
lines changed

apps/server/src/orchestration/Layers/OrchestrationEngine.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,15 @@ const makeOrchestrationEngine = Effect.gen(function* () {
201201
}
202202
yield* Deferred.succeed(envelope.result, { sequence: committedCommand.lastSequence });
203203
}).pipe(
204+
Effect.withSpan(`orchestration.command.${envelope.command.type}`),
205+
withMetrics({
206+
counter: orchestrationCommandsTotal,
207+
timer: orchestrationCommandDuration,
208+
attributes: {
209+
commandType: envelope.command.type,
210+
aggregateKind: aggregateRef.aggregateKind,
211+
},
212+
}),
204213
Effect.catch((error) =>
205214
Effect.gen(function* () {
206215
yield* reconcileReadModelAfterDispatchFailure.pipe(
@@ -232,16 +241,7 @@ const makeOrchestrationEngine = Effect.gen(function* () {
232241
yield* Deferred.fail(envelope.result, error);
233242
}),
234243
),
235-
Effect.withSpan(`orchestration.command.${envelope.command.type}`),
236244
Effect.asVoid,
237-
withMetrics({
238-
counter: orchestrationCommandsTotal,
239-
timer: orchestrationCommandDuration,
240-
attributes: {
241-
commandType: envelope.command.type,
242-
aggregateKind: aggregateRef.aggregateKind,
243-
},
244-
}),
245245
);
246246
};
247247

apps/server/src/ws.ts

Lines changed: 42 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -76,31 +76,9 @@ const WsRpcLayer = WsRpcGroup.toLayer(
7676
};
7777
});
7878

79-
const rpcEffect = <A, E, R>(
80-
method: string,
81-
effect: Effect.Effect<A, E, R>,
82-
traceAttributes?: Readonly<Record<string, unknown>>,
83-
) => observeRpcEffect(method, effect, traceAttributes);
84-
85-
const rpcStream = <A, E, R>(
86-
method: string,
87-
stream: Stream.Stream<A, E, R>,
88-
traceAttributes?: Readonly<Record<string, unknown>>,
89-
) => observeRpcStream(method, stream, traceAttributes);
90-
91-
const rpcStreamEffect = <A, StreamError, StreamContext, EffectError, EffectContext>(
92-
method: string,
93-
effect: Effect.Effect<
94-
Stream.Stream<A, StreamError, StreamContext>,
95-
EffectError,
96-
EffectContext
97-
>,
98-
traceAttributes?: Readonly<Record<string, unknown>>,
99-
) => observeRpcStreamEffect(method, effect, traceAttributes);
100-
10179
return WsRpcGroup.of({
10280
[ORCHESTRATION_WS_METHODS.getSnapshot]: (_input) =>
103-
rpcEffect(
81+
observeRpcEffect(
10482
ORCHESTRATION_WS_METHODS.getSnapshot,
10583
projectionSnapshotQuery.getSnapshot().pipe(
10684
Effect.mapError(
@@ -114,7 +92,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
11492
{ "rpc.aggregate": "orchestration" },
11593
),
11694
[ORCHESTRATION_WS_METHODS.dispatchCommand]: (command) =>
117-
rpcEffect(
95+
observeRpcEffect(
11896
ORCHESTRATION_WS_METHODS.dispatchCommand,
11997
Effect.gen(function* () {
12098
const normalizedCommand = yield* normalizeDispatchCommand(command);
@@ -132,7 +110,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
132110
{ "rpc.aggregate": "orchestration" },
133111
),
134112
[ORCHESTRATION_WS_METHODS.getTurnDiff]: (input) =>
135-
rpcEffect(
113+
observeRpcEffect(
136114
ORCHESTRATION_WS_METHODS.getTurnDiff,
137115
checkpointDiffQuery.getTurnDiff(input).pipe(
138116
Effect.mapError(
@@ -146,7 +124,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
146124
{ "rpc.aggregate": "orchestration" },
147125
),
148126
[ORCHESTRATION_WS_METHODS.getFullThreadDiff]: (input) =>
149-
rpcEffect(
127+
observeRpcEffect(
150128
ORCHESTRATION_WS_METHODS.getFullThreadDiff,
151129
checkpointDiffQuery.getFullThreadDiff(input).pipe(
152130
Effect.mapError(
@@ -160,7 +138,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
160138
{ "rpc.aggregate": "orchestration" },
161139
),
162140
[ORCHESTRATION_WS_METHODS.replayEvents]: (input) =>
163-
rpcEffect(
141+
observeRpcEffect(
164142
ORCHESTRATION_WS_METHODS.replayEvents,
165143
Stream.runCollect(
166144
orchestrationEngine.readEvents(
@@ -179,7 +157,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
179157
{ "rpc.aggregate": "orchestration" },
180158
),
181159
[WS_METHODS.subscribeOrchestrationDomainEvents]: (_input) =>
182-
rpcStreamEffect(
160+
observeRpcStreamEffect(
183161
WS_METHODS.subscribeOrchestrationDomainEvents,
184162
Effect.gen(function* () {
185163
const snapshot = yield* orchestrationEngine.getReadModel();
@@ -238,15 +216,17 @@ const WsRpcLayer = WsRpcGroup.toLayer(
238216
{ "rpc.aggregate": "orchestration" },
239217
),
240218
[WS_METHODS.serverGetConfig]: (_input) =>
241-
rpcEffect(WS_METHODS.serverGetConfig, loadServerConfig, { "rpc.aggregate": "server" }),
219+
observeRpcEffect(WS_METHODS.serverGetConfig, loadServerConfig, {
220+
"rpc.aggregate": "server",
221+
}),
242222
[WS_METHODS.serverRefreshProviders]: (_input) =>
243-
rpcEffect(
223+
observeRpcEffect(
244224
WS_METHODS.serverRefreshProviders,
245225
providerRegistry.refresh().pipe(Effect.map((providers) => ({ providers }))),
246226
{ "rpc.aggregate": "server" },
247227
),
248228
[WS_METHODS.serverUpsertKeybinding]: (rule) =>
249-
rpcEffect(
229+
observeRpcEffect(
250230
WS_METHODS.serverUpsertKeybinding,
251231
Effect.gen(function* () {
252232
const keybindingsConfig = yield* keybindings.upsertKeybindingRule(rule);
@@ -255,15 +235,15 @@ const WsRpcLayer = WsRpcGroup.toLayer(
255235
{ "rpc.aggregate": "server" },
256236
),
257237
[WS_METHODS.serverGetSettings]: (_input) =>
258-
rpcEffect(WS_METHODS.serverGetSettings, serverSettings.getSettings, {
238+
observeRpcEffect(WS_METHODS.serverGetSettings, serverSettings.getSettings, {
259239
"rpc.aggregate": "server",
260240
}),
261241
[WS_METHODS.serverUpdateSettings]: ({ patch }) =>
262-
rpcEffect(WS_METHODS.serverUpdateSettings, serverSettings.updateSettings(patch), {
242+
observeRpcEffect(WS_METHODS.serverUpdateSettings, serverSettings.updateSettings(patch), {
263243
"rpc.aggregate": "server",
264244
}),
265245
[WS_METHODS.projectsSearchEntries]: (input) =>
266-
rpcEffect(
246+
observeRpcEffect(
267247
WS_METHODS.projectsSearchEntries,
268248
workspaceEntries.search(input).pipe(
269249
Effect.mapError(
@@ -277,7 +257,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
277257
{ "rpc.aggregate": "workspace" },
278258
),
279259
[WS_METHODS.projectsWriteFile]: (input) =>
280-
rpcEffect(
260+
observeRpcEffect(
281261
WS_METHODS.projectsWriteFile,
282262
workspaceFileSystem.writeFile(input).pipe(
283263
Effect.mapError((cause) => {
@@ -293,15 +273,19 @@ const WsRpcLayer = WsRpcGroup.toLayer(
293273
{ "rpc.aggregate": "workspace" },
294274
),
295275
[WS_METHODS.shellOpenInEditor]: (input) =>
296-
rpcEffect(WS_METHODS.shellOpenInEditor, open.openInEditor(input), {
276+
observeRpcEffect(WS_METHODS.shellOpenInEditor, open.openInEditor(input), {
297277
"rpc.aggregate": "workspace",
298278
}),
299279
[WS_METHODS.gitStatus]: (input) =>
300-
rpcEffect(WS_METHODS.gitStatus, gitManager.status(input), { "rpc.aggregate": "git" }),
280+
observeRpcEffect(WS_METHODS.gitStatus, gitManager.status(input), {
281+
"rpc.aggregate": "git",
282+
}),
301283
[WS_METHODS.gitPull]: (input) =>
302-
rpcEffect(WS_METHODS.gitPull, git.pullCurrentBranch(input.cwd), { "rpc.aggregate": "git" }),
284+
observeRpcEffect(WS_METHODS.gitPull, git.pullCurrentBranch(input.cwd), {
285+
"rpc.aggregate": "git",
286+
}),
303287
[WS_METHODS.gitRunStackedAction]: (input) =>
304-
rpcStream(
288+
observeRpcStream(
305289
WS_METHODS.gitRunStackedAction,
306290
Stream.callback<GitActionProgressEvent, GitManagerServiceError>((queue) =>
307291
gitManager
@@ -321,61 +305,63 @@ const WsRpcLayer = WsRpcGroup.toLayer(
321305
{ "rpc.aggregate": "git" },
322306
),
323307
[WS_METHODS.gitResolvePullRequest]: (input) =>
324-
rpcEffect(WS_METHODS.gitResolvePullRequest, gitManager.resolvePullRequest(input), {
308+
observeRpcEffect(WS_METHODS.gitResolvePullRequest, gitManager.resolvePullRequest(input), {
325309
"rpc.aggregate": "git",
326310
}),
327311
[WS_METHODS.gitPreparePullRequestThread]: (input) =>
328-
rpcEffect(
312+
observeRpcEffect(
329313
WS_METHODS.gitPreparePullRequestThread,
330314
gitManager.preparePullRequestThread(input),
331315
{ "rpc.aggregate": "git" },
332316
),
333317
[WS_METHODS.gitListBranches]: (input) =>
334-
rpcEffect(WS_METHODS.gitListBranches, git.listBranches(input), { "rpc.aggregate": "git" }),
318+
observeRpcEffect(WS_METHODS.gitListBranches, git.listBranches(input), {
319+
"rpc.aggregate": "git",
320+
}),
335321
[WS_METHODS.gitCreateWorktree]: (input) =>
336-
rpcEffect(WS_METHODS.gitCreateWorktree, git.createWorktree(input), {
322+
observeRpcEffect(WS_METHODS.gitCreateWorktree, git.createWorktree(input), {
337323
"rpc.aggregate": "git",
338324
}),
339325
[WS_METHODS.gitRemoveWorktree]: (input) =>
340-
rpcEffect(WS_METHODS.gitRemoveWorktree, git.removeWorktree(input), {
326+
observeRpcEffect(WS_METHODS.gitRemoveWorktree, git.removeWorktree(input), {
341327
"rpc.aggregate": "git",
342328
}),
343329
[WS_METHODS.gitCreateBranch]: (input) =>
344-
rpcEffect(WS_METHODS.gitCreateBranch, git.createBranch(input), {
330+
observeRpcEffect(WS_METHODS.gitCreateBranch, git.createBranch(input), {
345331
"rpc.aggregate": "git",
346332
}),
347333
[WS_METHODS.gitCheckout]: (input) =>
348-
rpcEffect(WS_METHODS.gitCheckout, Effect.scoped(git.checkoutBranch(input)), {
334+
observeRpcEffect(WS_METHODS.gitCheckout, Effect.scoped(git.checkoutBranch(input)), {
349335
"rpc.aggregate": "git",
350336
}),
351337
[WS_METHODS.gitInit]: (input) =>
352-
rpcEffect(WS_METHODS.gitInit, git.initRepo(input), { "rpc.aggregate": "git" }),
338+
observeRpcEffect(WS_METHODS.gitInit, git.initRepo(input), { "rpc.aggregate": "git" }),
353339
[WS_METHODS.terminalOpen]: (input) =>
354-
rpcEffect(WS_METHODS.terminalOpen, terminalManager.open(input), {
340+
observeRpcEffect(WS_METHODS.terminalOpen, terminalManager.open(input), {
355341
"rpc.aggregate": "terminal",
356342
}),
357343
[WS_METHODS.terminalWrite]: (input) =>
358-
rpcEffect(WS_METHODS.terminalWrite, terminalManager.write(input), {
344+
observeRpcEffect(WS_METHODS.terminalWrite, terminalManager.write(input), {
359345
"rpc.aggregate": "terminal",
360346
}),
361347
[WS_METHODS.terminalResize]: (input) =>
362-
rpcEffect(WS_METHODS.terminalResize, terminalManager.resize(input), {
348+
observeRpcEffect(WS_METHODS.terminalResize, terminalManager.resize(input), {
363349
"rpc.aggregate": "terminal",
364350
}),
365351
[WS_METHODS.terminalClear]: (input) =>
366-
rpcEffect(WS_METHODS.terminalClear, terminalManager.clear(input), {
352+
observeRpcEffect(WS_METHODS.terminalClear, terminalManager.clear(input), {
367353
"rpc.aggregate": "terminal",
368354
}),
369355
[WS_METHODS.terminalRestart]: (input) =>
370-
rpcEffect(WS_METHODS.terminalRestart, terminalManager.restart(input), {
356+
observeRpcEffect(WS_METHODS.terminalRestart, terminalManager.restart(input), {
371357
"rpc.aggregate": "terminal",
372358
}),
373359
[WS_METHODS.terminalClose]: (input) =>
374-
rpcEffect(WS_METHODS.terminalClose, terminalManager.close(input), {
360+
observeRpcEffect(WS_METHODS.terminalClose, terminalManager.close(input), {
375361
"rpc.aggregate": "terminal",
376362
}),
377363
[WS_METHODS.subscribeTerminalEvents]: (_input) =>
378-
rpcStream(
364+
observeRpcStream(
379365
WS_METHODS.subscribeTerminalEvents,
380366
Stream.callback<TerminalEvent>((queue) =>
381367
Effect.acquireRelease(
@@ -386,7 +372,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
386372
{ "rpc.aggregate": "terminal" },
387373
),
388374
[WS_METHODS.subscribeServerConfig]: (_input) =>
389-
rpcStreamEffect(
375+
observeRpcStreamEffect(
390376
WS_METHODS.subscribeServerConfig,
391377
Effect.gen(function* () {
392378
const keybindingsUpdates = keybindings.streamChanges.pipe(
@@ -425,7 +411,7 @@ const WsRpcLayer = WsRpcGroup.toLayer(
425411
{ "rpc.aggregate": "server" },
426412
),
427413
[WS_METHODS.subscribeServerLifecycle]: (_input) =>
428-
rpcStreamEffect(
414+
observeRpcStreamEffect(
429415
WS_METHODS.subscribeServerLifecycle,
430416
Effect.gen(function* () {
431417
const snapshot = yield* lifecycleEvents.snapshot;

0 commit comments

Comments
 (0)