From b2bd70a28f3ecc43579e84b68b59b039981bfe01 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 11:07:00 -0700 Subject: [PATCH 1/7] Export browser traces through the server - Add client OTLP tracing configuration and distributed trace headers - Proxy browser OTLP spans to upstream collectors and local trace sink - Co-authored-by: codex --- apps/server/src/http.ts | 69 +++- .../src/observability/Layers/Observability.ts | 26 +- .../src/observability/LocalFileTracer.test.ts | 6 +- .../src/observability/LocalFileTracer.ts | 17 +- .../Services/BrowserTraceCollector.ts | 13 + apps/server/src/observability/TraceRecord.ts | 341 +++++++++++++++++- apps/server/src/server.test.ts | 298 ++++++++++++++- apps/server/src/server.ts | 8 +- apps/web/src/observability/clientTracing.ts | 143 ++++++++ apps/web/src/rpc/client.test.ts | 45 +++ apps/web/src/rpc/client.ts | 13 +- apps/web/src/wsTransport.test.ts | 51 ++- apps/web/src/wsTransport.ts | 14 +- 13 files changed, 1017 insertions(+), 27 deletions(-) create mode 100644 apps/server/src/observability/Services/BrowserTraceCollector.ts create mode 100644 apps/web/src/observability/clientTracing.ts diff --git a/apps/server/src/http.ts b/apps/server/src/http.ts index cee08b4f98..8b4e7c68fc 100644 --- a/apps/server/src/http.ts +++ b/apps/server/src/http.ts @@ -1,6 +1,13 @@ import Mime from "@effect/platform-node/Mime"; -import { Effect, FileSystem, Option, Path } from "effect"; -import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"; +import { Effect, FileSystem, Layer, Option, Path } from "effect"; +import { + HttpBody, + HttpClient, + HttpClientResponse, + HttpRouter, + HttpServerResponse, + HttpServerRequest, +} from "effect/unstable/http"; import { ATTACHMENTS_ROUTE_PREFIX, @@ -9,10 +16,68 @@ import { } from "./attachmentPaths"; import { resolveAttachmentPathById } from "./attachmentStore"; import { ServerConfig } from "./config"; +import { decodeOtlpTraceRecords, OtlpTracePayloadSchema } from "./observability/TraceRecord.ts"; +import { BrowserTraceCollector } from "./observability/Services/BrowserTraceCollector.ts"; import { ProjectFaviconResolver } from "./project/Services/ProjectFaviconResolver"; const PROJECT_FAVICON_CACHE_CONTROL = "public, max-age=3600"; const FALLBACK_PROJECT_FAVICON_SVG = ``; +const OTLP_TRACES_PROXY_PATH = "/api/observability/v1/traces"; + +export const otlpTracesProxyRouteLayer = HttpRouter.add( + "POST", + OTLP_TRACES_PROXY_PATH, + Effect.gen(function* () { + const config = yield* ServerConfig; + const otlpTracesUrl = config.otlpTracesUrl; + const browserTraceCollector = yield* BrowserTraceCollector; + const httpClient = yield* HttpClient.HttpClient; + + // Get raw body + const body = yield* Effect.service(HttpServerRequest.HttpServerRequest).pipe( + Effect.flatMap((request) => request.arrayBuffer), + Effect.map((buffer) => new Uint8Array(buffer)), + ); + + // Collect traces to local trace sink + yield* HttpServerRequest.schemaBodyJson(OtlpTracePayloadSchema).pipe( + Effect.map(decodeOtlpTraceRecords), + Effect.flatMap(browserTraceCollector.record), + Effect.catch((cause) => Effect.logWarning("Failed to record browser OTLP traces", { cause })), + ); + + if (otlpTracesUrl === undefined) { + return HttpServerResponse.empty({ status: 204 }); + } + + // Forward request to remote OTLP traces endpoint + return yield* httpClient + .post(otlpTracesUrl, { + body: HttpBody.uint8Array(body), + }) + .pipe( + Effect.flatMap(HttpClientResponse.filterStatusOk), + Effect.as(HttpServerResponse.empty({ status: 204 })), + Effect.tapError((cause) => + Effect.logWarning("Failed to export browser OTLP traces", { + cause, + otlpTracesUrl, + }), + ), + Effect.catch(() => + Effect.succeed(HttpServerResponse.text("Trace export failed.", { status: 502 })), + ), + ); + }), +).pipe( + Layer.provide( + HttpRouter.cors({ + allowedMethods: ["POST", "OPTIONS"], + allowedHeaders: ["content-type"], + maxAge: 600, + }), + ), +); export const attachmentsRouteLayer = HttpRouter.add( "GET", diff --git a/apps/server/src/observability/Layers/Observability.ts b/apps/server/src/observability/Layers/Observability.ts index 29b9d7eac5..7c6a28005f 100644 --- a/apps/server/src/observability/Layers/Observability.ts +++ b/apps/server/src/observability/Layers/Observability.ts @@ -4,6 +4,8 @@ import { OtlpMetrics, OtlpSerialization, OtlpTracer } from "effect/unstable/obse import { ServerConfig } from "../../config.ts"; import { ServerLoggerLive } from "../../serverLogger.ts"; import { makeLocalFileTracer } from "../LocalFileTracer.ts"; +import { BrowserTraceCollector } from "../Services/BrowserTraceCollector.ts"; +import { makeTraceSink } from "../TraceSink.ts"; const otlpSerializationLayer = OtlpSerialization.layerJson; @@ -16,9 +18,14 @@ export const ObservabilityLive = Layer.unwrap( Layer.succeed(References.TracerTimingEnabled, config.traceTimingEnabled), ); - const tracerLayer = Layer.effect( - Tracer.Tracer, + const tracerLayer = Layer.unwrap( Effect.gen(function* () { + const sink = yield* makeTraceSink({ + filePath: config.serverTracePath, + maxBytes: config.traceMaxBytes, + maxFiles: config.traceMaxFiles, + batchWindowMs: config.traceBatchWindowMs, + }); const delegate = config.otlpTracesUrl === undefined ? undefined @@ -34,13 +41,26 @@ export const ObservabilityLive = Layer.unwrap( }, }); - return yield* makeLocalFileTracer({ + const tracer = yield* makeLocalFileTracer({ filePath: config.serverTracePath, maxBytes: config.traceMaxBytes, maxFiles: config.traceMaxFiles, batchWindowMs: config.traceBatchWindowMs, + sink, ...(delegate ? { delegate } : {}), }); + + return Layer.mergeAll( + Layer.succeed(Tracer.Tracer, tracer), + Layer.succeed(BrowserTraceCollector, { + record: (records) => + Effect.sync(() => { + for (const record of records) { + sink.push(record); + } + }), + }), + ); }), ).pipe(Layer.provideMerge(otlpSerializationLayer)); diff --git a/apps/server/src/observability/LocalFileTracer.test.ts b/apps/server/src/observability/LocalFileTracer.test.ts index 1bc5a34305..19efffaf10 100644 --- a/apps/server/src/observability/LocalFileTracer.test.ts +++ b/apps/server/src/observability/LocalFileTracer.test.ts @@ -5,7 +5,7 @@ import path from "node:path"; import { assert, describe, it } from "@effect/vitest"; import { Effect, Layer, Logger, References, Tracer } from "effect"; -import type { TraceRecord } from "./TraceRecord.ts"; +import type { EffectTraceRecord } from "./TraceRecord.ts"; import { makeLocalFileTracer } from "./LocalFileTracer.ts"; const makeTestLayer = (tracePath: string) => @@ -23,13 +23,13 @@ const makeTestLayer = (tracePath: string) => Layer.succeed(References.MinimumLogLevel, "Info"), ); -const readTraceRecords = (tracePath: string): Array => +const readTraceRecords = (tracePath: string): Array => fs .readFileSync(tracePath, "utf8") .trim() .split("\n") .filter((line) => line.length > 0) - .map((line) => JSON.parse(line) as TraceRecord); + .map((line) => JSON.parse(line) as EffectTraceRecord); describe("LocalFileTracer", () => { it.effect("writes nested spans to disk and captures log messages as span events", () => diff --git a/apps/server/src/observability/LocalFileTracer.ts b/apps/server/src/observability/LocalFileTracer.ts index 7f75eb0bf9..bb6c053875 100644 --- a/apps/server/src/observability/LocalFileTracer.ts +++ b/apps/server/src/observability/LocalFileTracer.ts @@ -2,7 +2,7 @@ import type * as Exit from "effect/Exit"; import { Effect, Option, Tracer } from "effect"; import { spanToTraceRecord } from "./TraceRecord.ts"; -import { makeTraceSink } from "./TraceSink.ts"; +import { makeTraceSink, type TraceSink } from "./TraceSink.ts"; export interface LocalFileTracerOptions { readonly filePath: string; @@ -10,6 +10,7 @@ export interface LocalFileTracerOptions { readonly maxFiles: number; readonly batchWindowMs: number; readonly delegate?: Tracer.Tracer; + readonly sink?: TraceSink; } class LocalFileSpan implements Tracer.Span { @@ -82,12 +83,14 @@ class LocalFileSpan implements Tracer.Span { export const makeLocalFileTracer = Effect.fn("makeLocalFileTracer")(function* ( options: LocalFileTracerOptions, ) { - const sink = yield* makeTraceSink({ - filePath: options.filePath, - maxBytes: options.maxBytes, - maxFiles: options.maxFiles, - batchWindowMs: options.batchWindowMs, - }); + const sink = + options.sink ?? + (yield* makeTraceSink({ + filePath: options.filePath, + maxBytes: options.maxBytes, + maxFiles: options.maxFiles, + batchWindowMs: options.batchWindowMs, + })); const delegate = options.delegate ?? diff --git a/apps/server/src/observability/Services/BrowserTraceCollector.ts b/apps/server/src/observability/Services/BrowserTraceCollector.ts new file mode 100644 index 0000000000..60e8f5fa13 --- /dev/null +++ b/apps/server/src/observability/Services/BrowserTraceCollector.ts @@ -0,0 +1,13 @@ +import { ServiceMap } from "effect"; +import type { Effect } from "effect"; + +import type { TraceRecord } from "../TraceRecord.ts"; + +export interface BrowserTraceCollectorShape { + readonly record: (records: ReadonlyArray) => Effect.Effect; +} + +export class BrowserTraceCollector extends ServiceMap.Service< + BrowserTraceCollector, + BrowserTraceCollectorShape +>()("t3/observability/Services/BrowserTraceCollector") {} diff --git a/apps/server/src/observability/TraceRecord.ts b/apps/server/src/observability/TraceRecord.ts index 3aee1b267c..24f7da0d72 100644 --- a/apps/server/src/observability/TraceRecord.ts +++ b/apps/server/src/observability/TraceRecord.ts @@ -1,8 +1,8 @@ -import { Cause, Exit, Option, Tracer } from "effect"; +import { Cause, Exit, Option, Schema, Tracer } from "effect"; import { compactTraceAttributes } from "./Attributes.ts"; -export interface TraceRecord { +export interface EffectTraceRecord { readonly type: "effect-span"; readonly name: string; readonly traceId: string; @@ -38,6 +38,93 @@ export interface TraceRecord { }; } +export interface OtlpTraceRecord { + readonly type: "otlp-span"; + readonly name: string; + readonly traceId: string; + readonly spanId: string; + readonly parentSpanId?: string; + readonly sampled: boolean; + readonly kind: string; + readonly startTimeUnixNano: string; + readonly endTimeUnixNano: string; + readonly durationMs: number; + readonly attributes: Readonly>; + readonly resourceAttributes: Readonly>; + readonly scope: Readonly<{ + readonly name?: string; + readonly version?: string; + readonly attributes: Readonly>; + }>; + readonly events: ReadonlyArray<{ + readonly name: string; + readonly timeUnixNano: string; + readonly attributes: Readonly>; + }>; + readonly links: ReadonlyArray<{ + readonly traceId: string; + readonly spanId: string; + readonly attributes: Readonly>; + }>; + readonly status?: + | { + readonly code?: string; + readonly message?: string; + } + | undefined; +} + +export type TraceRecord = EffectTraceRecord | OtlpTraceRecord; + +const OtlpNumberishSchema = Schema.Union([Schema.String, Schema.Number]); +type OtlpUnknownRecord = Readonly>; + +const OtlpSpanSchema = Schema.Struct({ + traceId: Schema.optionalKey(Schema.String), + spanId: Schema.optionalKey(Schema.String), + parentSpanId: Schema.optionalKey(Schema.String), + name: Schema.optionalKey(Schema.String), + kind: Schema.optionalKey(OtlpNumberishSchema), + startTimeUnixNano: Schema.optionalKey(OtlpNumberishSchema), + endTimeUnixNano: Schema.optionalKey(OtlpNumberishSchema), + attributes: Schema.optionalKey(Schema.Array(Schema.Unknown)), + events: Schema.optionalKey(Schema.Array(Schema.Unknown)), + links: Schema.optionalKey(Schema.Array(Schema.Unknown)), + status: Schema.optionalKey(Schema.Unknown), + flags: Schema.optionalKey(OtlpNumberishSchema), +}); +type OtlpSpan = typeof OtlpSpanSchema.Type; + +const OtlpInstrumentationScopeSchema = Schema.Struct({ + name: Schema.optionalKey(Schema.String), + version: Schema.optionalKey(Schema.String), + attributes: Schema.optionalKey(Schema.Array(Schema.Unknown)), +}); + +const OtlpScopeSpansSchema = Schema.Struct({ + scope: Schema.optionalKey(OtlpInstrumentationScopeSchema), + spans: Schema.Array(OtlpSpanSchema), +}); + +const OtlpResourceSchema = Schema.Struct({ + attributes: Schema.optionalKey(Schema.Array(Schema.Unknown)), +}); + +const OtlpResourceSpansSchema = Schema.Struct({ + resource: Schema.optionalKey(OtlpResourceSchema), + scopeSpans: Schema.Array(OtlpScopeSpansSchema), +}); + +export const OtlpTracePayloadSchema = Schema.Struct({ + resourceSpans: Schema.Array(OtlpResourceSpansSchema), +}); +export type OtlpTracePayload = typeof OtlpTracePayloadSchema.Type; + +interface OtlpKeyValue { + readonly key: string; + readonly value: unknown; +} + interface SerializableSpan { readonly name: string; readonly traceId: string; @@ -53,7 +140,7 @@ interface SerializableSpan { >; } -function formatTraceExit(exit: Exit.Exit): TraceRecord["exit"] { +function formatTraceExit(exit: Exit.Exit): EffectTraceRecord["exit"] { if (Exit.isSuccess(exit)) { return { _tag: "Success" }; } @@ -69,7 +156,7 @@ function formatTraceExit(exit: Exit.Exit): TraceRecord["exit"] }; } -export function spanToTraceRecord(span: SerializableSpan): TraceRecord { +export function spanToTraceRecord(span: SerializableSpan): EffectTraceRecord { const status = span.status as Extract; const parentSpanId = Option.getOrUndefined(span.parent)?.spanId; @@ -98,3 +185,249 @@ export function spanToTraceRecord(span: SerializableSpan): TraceRecord { exit: formatTraceExit(status.exit), }; } + +const SPAN_KIND_MAP: Record = { + 1: "internal", + 2: "server", + 3: "client", + 4: "producer", + 5: "consumer", +}; + +export function decodeOtlpTraceRecords(payload: OtlpTracePayload): ReadonlyArray { + const records: Array = []; + + for (const resourceSpan of payload.resourceSpans) { + const resourceAttributes = decodeAttributes(resourceSpan.resource?.attributes ?? []); + + for (const scopeSpan of resourceSpan.scopeSpans) { + const scope = scopeSpan.scope; + const scopeAttributes = decodeAttributes(scope?.attributes ?? []); + + for (const span of scopeSpan.spans) { + const traceId = asNonEmptyString(span.traceId); + const spanId = asNonEmptyString(span.spanId); + if (!traceId || !spanId) { + continue; + } + + records.push( + otlpSpanToTraceRecord({ + resourceAttributes, + scopeAttributes, + scopeName: asNonEmptyString(scope?.name), + scopeVersion: asNonEmptyString(scope?.version), + span, + }), + ); + } + } + } + + return records; +} + +function otlpSpanToTraceRecord(input: { + readonly resourceAttributes: Readonly>; + readonly scopeAttributes: Readonly>; + readonly scopeName: string | undefined; + readonly scopeVersion: string | undefined; + readonly span: OtlpSpan; +}): OtlpTraceRecord { + const startTimeUnixNano = asString(input.span.startTimeUnixNano) ?? "0"; + const endTimeUnixNano = asString(input.span.endTimeUnixNano) ?? startTimeUnixNano; + + return { + type: "otlp-span", + name: asNonEmptyString(input.span.name) ?? "unknown", + traceId: asNonEmptyString(input.span.traceId) ?? "", + spanId: asNonEmptyString(input.span.spanId) ?? "", + ...(asNonEmptyString(input.span.parentSpanId) + ? { parentSpanId: asNonEmptyString(input.span.parentSpanId)! } + : {}), + sampled: isSampled(input.span.flags), + kind: normalizeSpanKind(input.span.kind), + startTimeUnixNano, + endTimeUnixNano, + durationMs: Number(parseBigInt(endTimeUnixNano) - parseBigInt(startTimeUnixNano)) / 1_000_000, + attributes: decodeAttributes(input.span.attributes ?? []), + resourceAttributes: input.resourceAttributes, + scope: { + ...(input.scopeName ? { name: input.scopeName } : {}), + ...(input.scopeVersion ? { version: input.scopeVersion } : {}), + attributes: input.scopeAttributes, + }, + events: decodeEvents(input.span.events ?? []), + links: decodeLinks(input.span.links ?? []), + status: decodeStatus(input.span.status), + }; +} + +function decodeStatus(input: unknown): OtlpTraceRecord["status"] { + if (!isRecord(input)) { + return undefined; + } + + const code = asNonEmptyString(input.code) ?? asString(input.code); + const message = asNonEmptyString(input.message); + if (!code && !message) { + return undefined; + } + + return { + ...(code ? { code } : {}), + ...(message ? { message } : {}), + }; +} + +function decodeEvents(input: ReadonlyArray): OtlpTraceRecord["events"] { + return input.flatMap((current) => { + if (!isRecord(current)) { + return []; + } + + return [ + { + name: asNonEmptyString(current.name) ?? "event", + timeUnixNano: asString(current.timeUnixNano) ?? "0", + attributes: decodeAttributes(asArray(current.attributes)), + }, + ]; + }); +} + +function decodeLinks(input: ReadonlyArray): OtlpTraceRecord["links"] { + return input.flatMap((current) => { + if (!isRecord(current)) { + return []; + } + + const traceId = asNonEmptyString(current.traceId); + const spanId = asNonEmptyString(current.spanId); + if (!traceId || !spanId) { + return []; + } + + return [ + { + traceId, + spanId, + attributes: decodeAttributes(asArray(current.attributes)), + }, + ]; + }); +} + +function decodeAttributes(input: ReadonlyArray): Readonly> { + const entries: Record = {}; + + for (const attribute of input) { + if (!isKeyValue(attribute)) { + continue; + } + + const key = asNonEmptyString(attribute.key); + if (!key) { + continue; + } + entries[key] = decodeValue(attribute.value); + } + + return compactTraceAttributes(entries); +} + +function decodeValue(input: unknown): unknown { + if (!isRecord(input)) { + return input ?? null; + } + if ("stringValue" in input) { + return input.stringValue; + } + if ("boolValue" in input) { + return input.boolValue; + } + if ("intValue" in input) { + return normalizeInteger(input.intValue); + } + if ("doubleValue" in input) { + return input.doubleValue; + } + if ("bytesValue" in input) { + return input.bytesValue; + } + if (isRecord(input.arrayValue)) { + return asArray(input.arrayValue.values).map((entry) => decodeValue(entry)); + } + if (isRecord(input.kvlistValue)) { + return decodeAttributes(asArray(input.kvlistValue.values)); + } + return null; +} + +function normalizeInteger(input: unknown): number | string { + if (typeof input === "number") { + return input; + } + if (typeof input !== "string") { + return String(input ?? ""); + } + + const parsed = Number(input); + return Number.isSafeInteger(parsed) ? parsed : input; +} + +function normalizeSpanKind(input: unknown): OtlpTraceRecord["kind"] { + if (typeof input === "string" && input.trim().length > 0) { + return input.trim().toLowerCase(); + } + if (typeof input === "number") { + return SPAN_KIND_MAP[input] ?? "internal"; + } + return "internal"; +} + +function isSampled(input: unknown): boolean { + if (typeof input === "number") { + return (input & 0x01) === 0x01; + } + if (typeof input === "string") { + const parsed = Number(input); + return Number.isNaN(parsed) ? true : (parsed & 0x01) === 0x01; + } + return true; +} + +function parseBigInt(input: string): bigint { + try { + return BigInt(input); + } catch { + return 0n; + } +} + +function asString(input: unknown): string | undefined { + if (typeof input === "string") { + return input; + } + if (typeof input === "number") { + return String(input); + } + return undefined; +} + +function asNonEmptyString(input: unknown): string | undefined { + const value = asString(input)?.trim(); + return value ? value : undefined; +} + +function asArray(input: unknown): ReadonlyArray { + return Array.isArray(input) ? input : []; +} + +function isRecord(input: unknown): input is OtlpUnknownRecord { + return typeof input === "object" && input !== null; +} + +function isKeyValue(input: unknown): input is OtlpKeyValue { + return isRecord(input) && typeof input.key === "string" && "value" in input; +} diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 3d8e36e0a7..f6534c2ba4 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -1,6 +1,7 @@ import * as NodeHttpServer from "@effect/platform-node/NodeHttpServer"; import * as NodeSocket from "@effect/platform-node/NodeSocket"; import * as NodeServices from "@effect/platform-node/NodeServices"; + import { CommandId, DEFAULT_SERVER_SETTINGS, @@ -22,7 +23,13 @@ import { import { assert, it } from "@effect/vitest"; import { assertFailure, assertInclude, assertTrue } from "@effect/vitest/utils"; import { Effect, FileSystem, Layer, Path, Stream } from "effect"; -import { HttpClient, HttpRouter, HttpServer } from "effect/unstable/http"; +import { + FetchHttpClient, + HttpBody, + HttpClient, + HttpRouter, + HttpServer, +} from "effect/unstable/http"; import { RpcClient, RpcSerialization } from "effect/unstable/rpc"; import { vi } from "vitest"; @@ -56,6 +63,10 @@ import { ServerLifecycleEvents, type ServerLifecycleEventsShape } from "./server import { ServerRuntimeStartup, type ServerRuntimeStartupShape } from "./serverRuntimeStartup.ts"; import { ServerSettingsService, type ServerSettingsShape } from "./serverSettings.ts"; import { TerminalManager, type TerminalManagerShape } from "./terminal/Services/Manager.ts"; +import { + BrowserTraceCollector, + type BrowserTraceCollectorShape, +} from "./observability/Services/BrowserTraceCollector.ts"; import { ProjectFaviconResolverLive } from "./project/Layers/ProjectFaviconResolver.ts"; import { ProjectSetupScriptRunner, @@ -138,6 +149,7 @@ const buildAppUnderTest = (options?: { orchestrationEngine?: Partial; projectionSnapshotQuery?: Partial; checkpointDiffQuery?: Partial; + browserTraceCollector?: Partial; serverLifecycleEvents?: Partial; serverRuntimeStartup?: Partial; }; @@ -263,6 +275,12 @@ const buildAppUnderTest = (options?: { ...options?.layers?.checkpointDiffQuery, }), ), + Layer.provide( + Layer.mock(BrowserTraceCollector)({ + record: () => Effect.void, + ...options?.layers?.browserTraceCollector, + }), + ), Layer.provide( Layer.mock(ServerLifecycleEvents)({ publish: (event) => Effect.succeed({ ...(event as any), sequence: 1 }), @@ -280,6 +298,7 @@ const buildAppUnderTest = (options?: { }), ), Layer.provide(workspaceAndProjectServicesLayer), + Layer.provideMerge(FetchHttpClient.layer), Layer.provide(layerConfig), ); @@ -437,6 +456,283 @@ it.layer(NodeServices.layer)("server router seam", (it) => { }).pipe(Effect.provide(NodeHttpServer.layerTest)), ); + it.effect("proxies browser OTLP trace exports through the server", () => + Effect.gen(function* () { + const upstreamRequests: Array<{ + readonly body: string; + readonly contentType: string | null; + }> = []; + const localTraceRecords: Array = []; + const payload = { + resourceSpans: [ + { + resource: { + attributes: [ + { + key: "service.name", + value: { stringValue: "t3-web" }, + }, + ], + }, + scopeSpans: [ + { + scope: { + name: "effect", + version: "4.0.0-beta.43", + }, + spans: [ + { + traceId: "11111111111111111111111111111111", + spanId: "2222222222222222", + parentSpanId: "3333333333333333", + name: "RpcClient.server.getSettings", + kind: 3, + startTimeUnixNano: "1000000", + endTimeUnixNano: "2000000", + attributes: [ + { + key: "rpc.method", + value: { stringValue: "server.getSettings" }, + }, + ], + events: [ + { + name: "http.request", + timeUnixNano: "1500000", + attributes: [ + { + key: "http.status_code", + value: { intValue: "200" }, + }, + ], + }, + ], + links: [], + status: { + code: "STATUS_CODE_OK", + }, + flags: 1, + }, + ], + }, + ], + }, + ], + }; + + const collector = yield* Effect.acquireRelease( + Effect.promise(async () => { + const NodeHttp = await import("node:http"); + + return await new Promise<{ + readonly close: () => Promise; + readonly url: string; + }>((resolve, reject) => { + const server = NodeHttp.createServer((request, response) => { + const chunks: Buffer[] = []; + request.on("data", (chunk) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + request.on("end", () => { + upstreamRequests.push({ + body: Buffer.concat(chunks).toString("utf8"), + contentType: request.headers["content-type"] ?? null, + }); + response.statusCode = 204; + response.end(); + }); + }); + + server.on("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + if (!address || typeof address === "string") { + reject(new Error("Expected TCP collector address")); + return; + } + + resolve({ + url: `http://127.0.0.1:${address.port}/v1/traces`, + close: () => + new Promise((resolveClose, rejectClose) => { + server.close((error) => { + if (error) { + rejectClose(error); + return; + } + resolveClose(); + }); + }), + }); + }); + }); + }), + ({ close }) => Effect.promise(close), + ); + + yield* buildAppUnderTest({ + config: { + otlpTracesUrl: collector.url, + }, + layers: { + browserTraceCollector: { + record: (records) => + Effect.sync(() => { + localTraceRecords.push(...records); + }), + }, + }, + }); + + const response = yield* HttpClient.post("/api/observability/v1/traces", { + headers: { + "content-type": "application/json", + origin: "http://localhost:5733", + }, + body: HttpBody.text(JSON.stringify(payload), "application/json"), + }); + + assert.equal(response.status, 204); + assert.equal(response.headers["access-control-allow-origin"], "*"); + assert.deepEqual(localTraceRecords, [ + { + type: "otlp-span", + name: "RpcClient.server.getSettings", + traceId: "11111111111111111111111111111111", + spanId: "2222222222222222", + parentSpanId: "3333333333333333", + sampled: true, + kind: "client", + startTimeUnixNano: "1000000", + endTimeUnixNano: "2000000", + durationMs: 1, + attributes: { + "rpc.method": "server.getSettings", + }, + resourceAttributes: { + "service.name": "t3-web", + }, + scope: { + name: "effect", + version: "4.0.0-beta.43", + attributes: {}, + }, + events: [ + { + name: "http.request", + timeUnixNano: "1500000", + attributes: { + "http.status_code": 200, + }, + }, + ], + links: [], + status: { + code: "STATUS_CODE_OK", + }, + }, + ]); + assert.deepEqual(upstreamRequests, [ + { + body: JSON.stringify(payload), + contentType: "application/json", + }, + ]); + }).pipe(Effect.provide(NodeHttpServer.layerTest)), + ); + + it.effect("responds to browser OTLP trace preflight requests with CORS headers", () => + Effect.gen(function* () { + yield* buildAppUnderTest(); + + const url = yield* getHttpServerUrl("/api/observability/v1/traces"); + const response = yield* Effect.promise(() => + fetch(url, { + method: "OPTIONS", + headers: { + origin: "http://localhost:5733", + "access-control-request-method": "POST", + "access-control-request-headers": "content-type", + }, + }), + ); + + assert.equal(response.status, 204); + assert.equal(response.headers.get("access-control-allow-origin"), "*"); + assert.equal(response.headers.get("access-control-allow-methods"), "POST, OPTIONS"); + assert.equal(response.headers.get("access-control-allow-headers"), "content-type"); + }).pipe(Effect.provide(NodeHttpServer.layerTest)), + ); + + it.effect( + "stores browser OTLP trace exports locally when no upstream collector is configured", + () => + Effect.gen(function* () { + const localTraceRecords: Array = []; + yield* buildAppUnderTest({ + layers: { + browserTraceCollector: { + record: (records) => + Effect.sync(() => { + localTraceRecords.push(...records); + }), + }, + }, + }); + + const response = yield* HttpClient.post("/api/observability/v1/traces", { + headers: { + "content-type": "application/json", + }, + body: HttpBody.text( + JSON.stringify({ + resourceSpans: [ + { + scopeSpans: [ + { + spans: [ + { + traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + spanId: "bbbbbbbbbbbbbbbb", + name: "client.test", + startTimeUnixNano: "1", + endTimeUnixNano: "1", + }, + ], + }, + ], + }, + ], + }), + "application/json", + ), + }); + + assert.equal(response.status, 204); + assert.deepEqual(localTraceRecords, [ + { + type: "otlp-span", + name: "client.test", + traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + spanId: "bbbbbbbbbbbbbbbb", + sampled: true, + kind: "internal", + startTimeUnixNano: "1", + endTimeUnixNano: "1", + durationMs: 0, + attributes: {}, + resourceAttributes: {}, + scope: { + attributes: {}, + }, + events: [], + links: [], + status: undefined, + }, + ]); + }).pipe(Effect.provide(NodeHttpServer.layerTest)), + ); + it.effect("returns 404 for missing attachment id lookups", () => Effect.gen(function* () { yield* buildAppUnderTest(); diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index 04ffaeeeeb..f56edde6fa 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -2,7 +2,12 @@ import { Effect, Layer } from "effect"; import { FetchHttpClient, HttpRouter, HttpServer } from "effect/unstable/http"; import { ServerConfig } from "./config"; -import { attachmentsRouteLayer, projectFaviconRouteLayer, staticAndDevRouteLayer } from "./http"; +import { + attachmentsRouteLayer, + otlpTracesProxyRouteLayer, + projectFaviconRouteLayer, + staticAndDevRouteLayer, +} from "./http"; import { fixPath } from "./os-jank"; import { websocketRpcRouteLayer } from "./ws"; import { OpenLive } from "./open"; @@ -205,6 +210,7 @@ const RuntimeServicesLive = ServerRuntimeStartupLive.pipe( export const makeRoutesLayer = Layer.mergeAll( attachmentsRouteLayer, + otlpTracesProxyRouteLayer, projectFaviconRouteLayer, staticAndDevRouteLayer, websocketRpcRouteLayer, diff --git a/apps/web/src/observability/clientTracing.ts b/apps/web/src/observability/clientTracing.ts new file mode 100644 index 0000000000..0c32576a18 --- /dev/null +++ b/apps/web/src/observability/clientTracing.ts @@ -0,0 +1,143 @@ +import { Exit, Layer, ManagedRuntime, Scope, Tracer } from "effect"; +import { FetchHttpClient } from "effect/unstable/http"; +import { OtlpSerialization, OtlpTracer } from "effect/unstable/observability"; + +import { isElectron } from "../env"; +import { resolveServerUrl } from "../lib/utils"; +import { APP_VERSION } from "~/branding"; + +const DEFAULT_EXPORT_INTERVAL_MS = 1_000; +const CLIENT_TRACING_RESOURCE = { + serviceName: "t3-web", + attributes: { + "service.runtime": "t3-web", + "service.mode": isElectron ? "electron" : "browser", + "service.version": APP_VERSION, + }, +} as const; + +const delegateRuntimeLayer = Layer.mergeAll(FetchHttpClient.layer, OtlpSerialization.layerJson); + +let activeDelegate: Tracer.Tracer | null = null; +let activeRuntime: ManagedRuntime.ManagedRuntime | null = null; +let activeScope: Scope.Closeable | null = null; +let activeConfigKey: string | null = null; +let configurationGeneration = 0; +let pendingConfiguration = Promise.resolve(); + +export interface ClientTracingConfig { + readonly exportIntervalMs?: number; +} + +export const ClientTracingLive = Layer.succeed( + Tracer.Tracer, + Tracer.make({ + span(options) { + return activeDelegate?.span(options) ?? new Tracer.NativeSpan(options); + }, + }), +); + +export function configureClientTracing(config: ClientTracingConfig = {}): Promise { + if (config.exportIntervalMs === undefined && activeConfigKey !== null) { + return pendingConfiguration; + } + pendingConfiguration = pendingConfiguration.finally(() => applyClientTracingConfig(config)); + return pendingConfiguration; +} + +async function applyClientTracingConfig(config: ClientTracingConfig): Promise { + const otlpTracesUrl = resolveServerUrl({ + protocol: window.location.protocol === "https:" ? "https" : "http", + pathname: "/api/observability/v1/traces", + }); + const exportIntervalMs = Math.max(10, config.exportIntervalMs ?? DEFAULT_EXPORT_INTERVAL_MS); + const nextConfigKey = `${otlpTracesUrl}|${exportIntervalMs}`; + + if (activeConfigKey === nextConfigKey && activeDelegate !== null) { + return; + } + + activeConfigKey = nextConfigKey; + const generation = ++configurationGeneration; + + const previousRuntime = activeRuntime; + const previousScope = activeScope; + + activeDelegate = null; + activeRuntime = null; + activeScope = null; + + await disposeTracerRuntime(previousRuntime, previousScope); + + const runtime = ManagedRuntime.make(delegateRuntimeLayer); + const scope = runtime.runSync(Scope.make()); + + try { + const delegate = await runtime.runPromise( + Scope.provide(scope)( + OtlpTracer.make({ + url: otlpTracesUrl, + exportInterval: `${exportIntervalMs} millis`, + resource: CLIENT_TRACING_RESOURCE, + }), + ), + ); + + if (generation !== configurationGeneration) { + await disposeTracerRuntime(runtime, scope); + return; + } + + activeDelegate = delegate; + activeRuntime = runtime; + activeScope = scope; + } catch (error) { + await disposeTracerRuntime(runtime, scope); + + if (generation === configurationGeneration) { + console.warn("Failed to configure client tracing exporter", { + error: formatError(error), + otlpTracesUrl, + }); + } + } +} + +async function disposeTracerRuntime( + runtime: ManagedRuntime.ManagedRuntime | null, + scope: Scope.Closeable | null, +): Promise { + if (runtime === null || scope === null) { + return; + } + + await runtime + .runPromise(Scope.close(scope, Exit.void)) + .catch(() => undefined) + .finally(() => { + runtime.dispose(); + }); +} + +function formatError(error: unknown): string { + if (error instanceof Error && error.message.trim().length > 0) { + return error.message; + } + + return String(error); +} + +export async function __resetClientTracingForTests() { + configurationGeneration++; + activeConfigKey = null; + activeDelegate = null; + pendingConfiguration = Promise.resolve(); + + const runtime = activeRuntime; + const scope = activeScope; + activeRuntime = null; + activeScope = null; + + await disposeTracerRuntime(runtime, scope); +} diff --git a/apps/web/src/rpc/client.test.ts b/apps/web/src/rpc/client.test.ts index c19b85dd36..bb70adbf67 100644 --- a/apps/web/src/rpc/client.test.ts +++ b/apps/web/src/rpc/client.test.ts @@ -2,6 +2,7 @@ import { DEFAULT_SERVER_SETTINGS, WS_METHODS } from "@t3tools/contracts"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { AsyncResult, AtomRegistry } from "effect/unstable/reactivity"; +import { configureClientTracing } from "../observability/clientTracing"; import { __resetWsRpcAtomClientForTests, runRpc, WsRpcAtomClient } from "./client"; type WsEventType = "open" | "message" | "close" | "error"; @@ -66,6 +67,7 @@ class MockWebSocket { } const originalWebSocket = globalThis.WebSocket; +const originalFetch = globalThis.fetch; function getSocket(): MockWebSocket { const socket = sockets.at(-1); @@ -112,6 +114,7 @@ beforeEach(() => { afterEach(() => { __resetWsRpcAtomClientForTests(); globalThis.WebSocket = originalWebSocket; + globalThis.fetch = originalFetch; vi.restoreAllMocks(); }); @@ -228,4 +231,46 @@ describe("WsRpcAtomClient", () => { release(); registry.dispose(); }); + + it("attaches distributed trace ids when client OTLP tracing is enabled", async () => { + await configureClientTracing({ + exportIntervalMs: 10, + }); + + const requestPromise = runRpc((client) => client(WS_METHODS.serverGetSettings, {})); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + + await waitFor(() => { + expect(socket.sent).toHaveLength(1); + }); + + const requestMessage = JSON.parse(socket.sent[0] ?? "{}") as { + id: string; + spanId?: string; + tag: string; + traceId?: string; + }; + expect(requestMessage.tag).toBe(WS_METHODS.serverGetSettings); + expect(requestMessage.traceId).toMatch(/^[0-9a-f]{32}$/); + expect(requestMessage.spanId).toMatch(/^[0-9a-f]{16}$/); + + socket.serverMessage( + JSON.stringify({ + _tag: "Exit", + requestId: requestMessage.id, + exit: { + _tag: "Success", + value: DEFAULT_SERVER_SETTINGS, + }, + }), + ); + + await expect(requestPromise).resolves.toEqual(DEFAULT_SERVER_SETTINGS); + }); }); diff --git a/apps/web/src/rpc/client.ts b/apps/web/src/rpc/client.ts index 117f7c0aaf..e4f9a6bbdd 100644 --- a/apps/web/src/rpc/client.ts +++ b/apps/web/src/rpc/client.ts @@ -2,6 +2,11 @@ import { WsRpcGroup } from "@t3tools/contracts"; import { Effect, Layer, ManagedRuntime } from "effect"; import { AtomRpc } from "effect/unstable/reactivity"; +import { + __resetClientTracingForTests, + ClientTracingLive, + configureClientTracing, +} from "../observability/clientTracing"; import { createWsRpcProtocolLayer } from "./protocol"; export class WsRpcAtomClient extends AtomRpc.Service()("WsRpcAtomClient", { @@ -16,18 +21,22 @@ function getRuntime() { return sharedRuntime; } - sharedRuntime = ManagedRuntime.make(WsRpcAtomClient.layer); + sharedRuntime = ManagedRuntime.make(Layer.mergeAll(WsRpcAtomClient.layer, ClientTracingLive)); return sharedRuntime; } export function runRpc( execute: (client: typeof WsRpcAtomClient.Service) => Effect.Effect, ): Promise { - return getRuntime().runPromise(WsRpcAtomClient.use(execute)); + return configureClientTracing().then(() => { + const runtime = getRuntime(); + return runtime.runPromise(WsRpcAtomClient.use(execute)); + }); } export async function __resetWsRpcAtomClientForTests() { const runtime = sharedRuntime; sharedRuntime = null; await runtime?.dispose(); + await __resetClientTracingForTests(); } diff --git a/apps/web/src/wsTransport.test.ts b/apps/web/src/wsTransport.test.ts index 2f64f2a692..6f4a5d1943 100644 --- a/apps/web/src/wsTransport.test.ts +++ b/apps/web/src/wsTransport.test.ts @@ -1,6 +1,10 @@ -import { WS_METHODS } from "@t3tools/contracts"; +import { DEFAULT_SERVER_SETTINGS, WS_METHODS } from "@t3tools/contracts"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + __resetClientTracingForTests, + configureClientTracing, +} from "./observability/clientTracing"; import { WsTransport } from "./wsTransport"; type WsEventType = "open" | "message" | "close" | "error"; @@ -63,6 +67,7 @@ class MockWebSocket { } const originalWebSocket = globalThis.WebSocket; +const originalFetch = globalThis.fetch; function getSocket(): MockWebSocket { const socket = sockets.at(-1); @@ -108,6 +113,8 @@ beforeEach(() => { afterEach(() => { globalThis.WebSocket = originalWebSocket; + globalThis.fetch = originalFetch; + void __resetClientTracingForTests(); vi.restoreAllMocks(); }); @@ -440,4 +447,46 @@ describe("WsTransport", () => { expect(callOrder).toEqual(["close:start", "close:done", "runtime:dispose"]); }); + + it("propagates OTLP trace ids for ws transport requests when client tracing is enabled", async () => { + await configureClientTracing({ + exportIntervalMs: 10, + }); + + const transport = new WsTransport("ws://localhost:3020"); + const requestPromise = transport.request((client) => client[WS_METHODS.serverGetSettings]({})); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + + await waitFor(() => { + expect(socket.sent).toHaveLength(1); + }); + + const requestMessage = JSON.parse(socket.sent[0] ?? "{}") as { + id: string; + spanId?: string; + traceId?: string; + }; + expect(requestMessage.traceId).toMatch(/^[0-9a-f]{32}$/); + expect(requestMessage.spanId).toMatch(/^[0-9a-f]{16}$/); + + socket.serverMessage( + JSON.stringify({ + _tag: "Exit", + requestId: requestMessage.id, + exit: { + _tag: "Success", + value: DEFAULT_SERVER_SETTINGS, + }, + }), + ); + + await expect(requestPromise).resolves.toEqual(DEFAULT_SERVER_SETTINGS); + await transport.dispose(); + }); }); diff --git a/apps/web/src/wsTransport.ts b/apps/web/src/wsTransport.ts index 70042261d5..ee33c5134b 100644 --- a/apps/web/src/wsTransport.ts +++ b/apps/web/src/wsTransport.ts @@ -1,4 +1,4 @@ -import { Duration, Effect, Exit, ManagedRuntime, Option, Scope, Stream } from "effect"; +import { Duration, Effect, Exit, Layer, ManagedRuntime, Option, Scope, Stream } from "effect"; import { createWsRpcProtocolLayer, @@ -6,6 +6,7 @@ import { type WsRpcProtocolClient, } from "./rpc/protocol"; import { RpcClient } from "effect/unstable/rpc"; +import { ClientTracingLive, configureClientTracing } from "./observability/clientTracing"; interface SubscribeOptions { readonly retryDelay?: Duration.Input; @@ -28,10 +29,14 @@ export class WsTransport { private readonly runtime: ManagedRuntime.ManagedRuntime; private readonly clientScope: Scope.Closeable; private readonly clientPromise: Promise; + private readonly tracingReady: Promise; private disposed = false; constructor(url?: string) { - this.runtime = ManagedRuntime.make(createWsRpcProtocolLayer(url)); + this.tracingReady = configureClientTracing(); + this.runtime = ManagedRuntime.make( + Layer.mergeAll(createWsRpcProtocolLayer(url), ClientTracingLive), + ); this.clientScope = this.runtime.runSync(Scope.make()); this.clientPromise = this.runtime.runPromise( Scope.provide(this.clientScope)(makeWsRpcProtocolClient), @@ -46,6 +51,7 @@ export class WsTransport { throw new Error("Transport disposed"); } + await this.tracingReady; const client = await this.clientPromise; return await this.runtime.runPromise(Effect.suspend(() => execute(client))); } @@ -58,6 +64,7 @@ export class WsTransport { throw new Error("Transport disposed"); } + await this.tracingReady; const client = await this.clientPromise; await this.runtime.runPromise( Stream.runForEach(connect(client), (value) => @@ -84,7 +91,8 @@ export class WsTransport { let active = true; const retryDelayMs = options?.retryDelay ?? DEFAULT_SUBSCRIPTION_RETRY_DELAY_MS; const cancel = this.runtime.runCallback( - Effect.promise(() => this.clientPromise).pipe( + Effect.promise(() => this.tracingReady).pipe( + Effect.flatMap(() => Effect.promise(() => this.clientPromise)), Effect.flatMap((client) => Stream.runForEach(connect(client), (value) => Effect.sync(() => { From c2ab9957c65370d0030e32e3529bbe84993304b5 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 11:53:39 -0700 Subject: [PATCH 2/7] Forward browser OTLP traces as JSON - Decode Effect OTLP payloads and proxy them to the backend - Disable client tracing on the export HTTP client Co-authored-by: codex --- apps/server/src/http.ts | 22 +-- .../src/observability/LocalFileTracer.ts | 4 +- apps/server/src/observability/TraceRecord.ts | 165 ++++++++---------- apps/web/src/observability/clientTracing.ts | 8 +- apps/web/src/rpc/client.test.ts | 5 +- 5 files changed, 87 insertions(+), 117 deletions(-) diff --git a/apps/server/src/http.ts b/apps/server/src/http.ts index 8b4e7c68fc..c43b1ee835 100644 --- a/apps/server/src/http.ts +++ b/apps/server/src/http.ts @@ -1,5 +1,6 @@ import Mime from "@effect/platform-node/Mime"; import { Effect, FileSystem, Layer, Option, Path } from "effect"; +import { cast } from "effect/Function"; import { HttpBody, HttpClient, @@ -8,6 +9,7 @@ import { HttpServerResponse, HttpServerRequest, } from "effect/unstable/http"; +import { OtlpTracer } from "effect/unstable/observability"; import { ATTACHMENTS_ROUTE_PREFIX, @@ -16,7 +18,7 @@ import { } from "./attachmentPaths"; import { resolveAttachmentPathById } from "./attachmentStore"; import { ServerConfig } from "./config"; -import { decodeOtlpTraceRecords, OtlpTracePayloadSchema } from "./observability/TraceRecord.ts"; +import { decodeOtlpTraceRecords } from "./observability/TraceRecord.ts"; import { BrowserTraceCollector } from "./observability/Services/BrowserTraceCollector.ts"; import { ProjectFaviconResolver } from "./project/Services/ProjectFaviconResolver"; @@ -28,32 +30,22 @@ export const otlpTracesProxyRouteLayer = HttpRouter.add( "POST", OTLP_TRACES_PROXY_PATH, Effect.gen(function* () { + const request = yield* HttpServerRequest.HttpServerRequest; const config = yield* ServerConfig; const otlpTracesUrl = config.otlpTracesUrl; const browserTraceCollector = yield* BrowserTraceCollector; const httpClient = yield* HttpClient.HttpClient; + const bodyJson = cast(yield* request.json); - // Get raw body - const body = yield* Effect.service(HttpServerRequest.HttpServerRequest).pipe( - Effect.flatMap((request) => request.arrayBuffer), - Effect.map((buffer) => new Uint8Array(buffer)), - ); - - // Collect traces to local trace sink - yield* HttpServerRequest.schemaBodyJson(OtlpTracePayloadSchema).pipe( - Effect.map(decodeOtlpTraceRecords), - Effect.flatMap(browserTraceCollector.record), - Effect.catch((cause) => Effect.logWarning("Failed to record browser OTLP traces", { cause })), - ); + yield* browserTraceCollector.record(decodeOtlpTraceRecords(bodyJson)); if (otlpTracesUrl === undefined) { return HttpServerResponse.empty({ status: 204 }); } - // Forward request to remote OTLP traces endpoint return yield* httpClient .post(otlpTracesUrl, { - body: HttpBody.uint8Array(body), + body: HttpBody.jsonUnsafe(bodyJson), }) .pipe( Effect.flatMap(HttpClientResponse.filterStatusOk), diff --git a/apps/server/src/observability/LocalFileTracer.ts b/apps/server/src/observability/LocalFileTracer.ts index bb6c053875..cde5a176e8 100644 --- a/apps/server/src/observability/LocalFileTracer.ts +++ b/apps/server/src/observability/LocalFileTracer.ts @@ -1,7 +1,7 @@ import type * as Exit from "effect/Exit"; import { Effect, Option, Tracer } from "effect"; -import { spanToTraceRecord } from "./TraceRecord.ts"; +import { EffectTraceRecord, spanToTraceRecord } from "./TraceRecord.ts"; import { makeTraceSink, type TraceSink } from "./TraceSink.ts"; export interface LocalFileTracerOptions { @@ -31,7 +31,7 @@ class LocalFileSpan implements Tracer.Span { constructor( options: Parameters[0], private readonly delegate: Tracer.Span, - private readonly push: (record: ReturnType) => void, + private readonly push: (record: EffectTraceRecord) => void, ) { this.name = delegate.name; this.spanId = delegate.spanId; diff --git a/apps/server/src/observability/TraceRecord.ts b/apps/server/src/observability/TraceRecord.ts index 24f7da0d72..81b7bcbc49 100644 --- a/apps/server/src/observability/TraceRecord.ts +++ b/apps/server/src/observability/TraceRecord.ts @@ -1,29 +1,37 @@ -import { Cause, Exit, Option, Schema, Tracer } from "effect"; +import { Cause, Exit, Option, Predicate, Tracer } from "effect"; import { compactTraceAttributes } from "./Attributes.ts"; +import { OtlpTracer } from "effect/unstable/observability"; -export interface EffectTraceRecord { - readonly type: "effect-span"; +interface TraceRecordEvent { + readonly name: string; + readonly timeUnixNano: string; + readonly attributes: Readonly>; +} + +interface TraceRecordLink { + readonly traceId: string; + readonly spanId: string; + readonly attributes: Readonly>; +} + +interface BaseTraceRecord { readonly name: string; + readonly kind: string; readonly traceId: string; readonly spanId: string; readonly parentSpanId?: string; readonly sampled: boolean; - readonly kind: Tracer.SpanKind; readonly startTimeUnixNano: string; readonly endTimeUnixNano: string; readonly durationMs: number; readonly attributes: Readonly>; - readonly events: ReadonlyArray<{ - readonly name: string; - readonly timeUnixNano: string; - readonly attributes: Readonly>; - }>; - readonly links: ReadonlyArray<{ - readonly traceId: string; - readonly spanId: string; - readonly attributes: Readonly>; - }>; + readonly events: ReadonlyArray; + readonly links: ReadonlyArray; +} + +export interface EffectTraceRecord extends BaseTraceRecord { + readonly type: "effect-span"; readonly exit: | { readonly _tag: "Success"; @@ -38,34 +46,14 @@ export interface EffectTraceRecord { }; } -export interface OtlpTraceRecord { +interface OtlpTraceRecord extends BaseTraceRecord { readonly type: "otlp-span"; - readonly name: string; - readonly traceId: string; - readonly spanId: string; - readonly parentSpanId?: string; - readonly sampled: boolean; - readonly kind: string; - readonly startTimeUnixNano: string; - readonly endTimeUnixNano: string; - readonly durationMs: number; - readonly attributes: Readonly>; readonly resourceAttributes: Readonly>; readonly scope: Readonly<{ readonly name?: string; readonly version?: string; readonly attributes: Readonly>; }>; - readonly events: ReadonlyArray<{ - readonly name: string; - readonly timeUnixNano: string; - readonly attributes: Readonly>; - }>; - readonly links: ReadonlyArray<{ - readonly traceId: string; - readonly spanId: string; - readonly attributes: Readonly>; - }>; readonly status?: | { readonly code?: string; @@ -76,55 +64,6 @@ export interface OtlpTraceRecord { export type TraceRecord = EffectTraceRecord | OtlpTraceRecord; -const OtlpNumberishSchema = Schema.Union([Schema.String, Schema.Number]); -type OtlpUnknownRecord = Readonly>; - -const OtlpSpanSchema = Schema.Struct({ - traceId: Schema.optionalKey(Schema.String), - spanId: Schema.optionalKey(Schema.String), - parentSpanId: Schema.optionalKey(Schema.String), - name: Schema.optionalKey(Schema.String), - kind: Schema.optionalKey(OtlpNumberishSchema), - startTimeUnixNano: Schema.optionalKey(OtlpNumberishSchema), - endTimeUnixNano: Schema.optionalKey(OtlpNumberishSchema), - attributes: Schema.optionalKey(Schema.Array(Schema.Unknown)), - events: Schema.optionalKey(Schema.Array(Schema.Unknown)), - links: Schema.optionalKey(Schema.Array(Schema.Unknown)), - status: Schema.optionalKey(Schema.Unknown), - flags: Schema.optionalKey(OtlpNumberishSchema), -}); -type OtlpSpan = typeof OtlpSpanSchema.Type; - -const OtlpInstrumentationScopeSchema = Schema.Struct({ - name: Schema.optionalKey(Schema.String), - version: Schema.optionalKey(Schema.String), - attributes: Schema.optionalKey(Schema.Array(Schema.Unknown)), -}); - -const OtlpScopeSpansSchema = Schema.Struct({ - scope: Schema.optionalKey(OtlpInstrumentationScopeSchema), - spans: Schema.Array(OtlpSpanSchema), -}); - -const OtlpResourceSchema = Schema.Struct({ - attributes: Schema.optionalKey(Schema.Array(Schema.Unknown)), -}); - -const OtlpResourceSpansSchema = Schema.Struct({ - resource: Schema.optionalKey(OtlpResourceSchema), - scopeSpans: Schema.Array(OtlpScopeSpansSchema), -}); - -export const OtlpTracePayloadSchema = Schema.Struct({ - resourceSpans: Schema.Array(OtlpResourceSpansSchema), -}); -export type OtlpTracePayload = typeof OtlpTracePayloadSchema.Type; - -interface OtlpKeyValue { - readonly key: string; - readonly value: unknown; -} - interface SerializableSpan { readonly name: string; readonly traceId: string; @@ -194,15 +133,16 @@ const SPAN_KIND_MAP: Record = { 5: "consumer", }; -export function decodeOtlpTraceRecords(payload: OtlpTracePayload): ReadonlyArray { +export function decodeOtlpTraceRecords( + payload: OtlpTracer.TraceData, +): ReadonlyArray { const records: Array = []; for (const resourceSpan of payload.resourceSpans) { const resourceAttributes = decodeAttributes(resourceSpan.resource?.attributes ?? []); for (const scopeSpan of resourceSpan.scopeSpans) { - const scope = scopeSpan.scope; - const scopeAttributes = decodeAttributes(scope?.attributes ?? []); + const scopeAttributes = decodeScopeAttributes(scopeSpan); for (const span of scopeSpan.spans) { const traceId = asNonEmptyString(span.traceId); @@ -215,8 +155,8 @@ export function decodeOtlpTraceRecords(payload: OtlpTracePayload): ReadonlyArray otlpSpanToTraceRecord({ resourceAttributes, scopeAttributes, - scopeName: asNonEmptyString(scope?.name), - scopeVersion: asNonEmptyString(scope?.version), + scopeName: decodeScopeName(scopeSpan), + scopeVersion: decodeScopeVersion(scopeSpan), span, }), ); @@ -227,12 +167,36 @@ export function decodeOtlpTraceRecords(payload: OtlpTracePayload): ReadonlyArray return records; } +function decodeScopeName(scopeSpan: OtlpTracer.ScopeSpan): string | undefined { + if (!isRecord(scopeSpan.scope)) { + return undefined; + } + + return asNonEmptyString(scopeSpan.scope.name); +} + +function decodeScopeAttributes(scopeSpan: OtlpTracer.ScopeSpan): Readonly> { + if (!isRecord(scopeSpan.scope)) { + return {}; + } + + return decodeAttributes(asArray(scopeSpan.scope.attributes)); +} + +function decodeScopeVersion(scopeSpan: OtlpTracer.ScopeSpan): string | undefined { + if (!isRecord(scopeSpan.scope)) { + return undefined; + } + + return asNonEmptyString(scopeSpan.scope.version); +} + function otlpSpanToTraceRecord(input: { readonly resourceAttributes: Readonly>; readonly scopeAttributes: Readonly>; readonly scopeName: string | undefined; readonly scopeVersion: string | undefined; - readonly span: OtlpSpan; + readonly span: OtlpTracer.ScopeSpan["spans"][number]; }): OtlpTraceRecord { const startTimeUnixNano = asString(input.span.startTimeUnixNano) ?? "0"; const endTimeUnixNano = asString(input.span.endTimeUnixNano) ?? startTimeUnixNano; @@ -245,7 +209,7 @@ function otlpSpanToTraceRecord(input: { ...(asNonEmptyString(input.span.parentSpanId) ? { parentSpanId: asNonEmptyString(input.span.parentSpanId)! } : {}), - sampled: isSampled(input.span.flags), + sampled: isSampled(decodeSpanFlags(input.span)), kind: normalizeSpanKind(input.span.kind), startTimeUnixNano, endTimeUnixNano, @@ -263,6 +227,14 @@ function otlpSpanToTraceRecord(input: { }; } +function decodeSpanFlags(span: OtlpTracer.ScopeSpan["spans"][number]): unknown { + if (!isRecord(span)) { + return undefined; + } + + return span.flags; +} + function decodeStatus(input: unknown): OtlpTraceRecord["status"] { if (!isRecord(input)) { return undefined; @@ -424,10 +396,9 @@ function asArray(input: unknown): ReadonlyArray { return Array.isArray(input) ? input : []; } -function isRecord(input: unknown): input is OtlpUnknownRecord { - return typeof input === "object" && input !== null; -} +const isRecord = Predicate.isObject; -function isKeyValue(input: unknown): input is OtlpKeyValue { - return isRecord(input) && typeof input.key === "string" && "value" in input; -} +const isKeyValue = Predicate.compose( + Predicate.isObject, + Predicate.and(Predicate.hasProperty("key"), Predicate.hasProperty("value")), +); diff --git a/apps/web/src/observability/clientTracing.ts b/apps/web/src/observability/clientTracing.ts index 0c32576a18..aa9d50c114 100644 --- a/apps/web/src/observability/clientTracing.ts +++ b/apps/web/src/observability/clientTracing.ts @@ -1,5 +1,5 @@ import { Exit, Layer, ManagedRuntime, Scope, Tracer } from "effect"; -import { FetchHttpClient } from "effect/unstable/http"; +import { FetchHttpClient, HttpClient } from "effect/unstable/http"; import { OtlpSerialization, OtlpTracer } from "effect/unstable/observability"; import { isElectron } from "../env"; @@ -16,7 +16,11 @@ const CLIENT_TRACING_RESOURCE = { }, } as const; -const delegateRuntimeLayer = Layer.mergeAll(FetchHttpClient.layer, OtlpSerialization.layerJson); +const delegateRuntimeLayer = Layer.mergeAll( + FetchHttpClient.layer, + OtlpSerialization.layerJson, + Layer.succeed(HttpClient.TracerDisabledWhen, () => true), +); let activeDelegate: Tracer.Tracer | null = null; let activeRuntime: ManagedRuntime.ManagedRuntime | null = null; diff --git a/apps/web/src/rpc/client.test.ts b/apps/web/src/rpc/client.test.ts index bb70adbf67..8e3e1a281d 100644 --- a/apps/web/src/rpc/client.test.ts +++ b/apps/web/src/rpc/client.test.ts @@ -2,7 +2,10 @@ import { DEFAULT_SERVER_SETTINGS, WS_METHODS } from "@t3tools/contracts"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { AsyncResult, AtomRegistry } from "effect/unstable/reactivity"; -import { configureClientTracing } from "../observability/clientTracing"; +import { + configureClientTracing, + __resetClientTracingForTests, +} from "../observability/clientTracing"; import { __resetWsRpcAtomClientForTests, runRpc, WsRpcAtomClient } from "./client"; type WsEventType = "open" | "message" | "close" | "error"; From 4dedcd5c2fc7d8adb87a9408dc4f60f930bc58af Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 12:21:45 -0700 Subject: [PATCH 3/7] simplify parser --- apps/server/src/observability/TraceRecord.ts | 236 +++++-------------- 1 file changed, 56 insertions(+), 180 deletions(-) diff --git a/apps/server/src/observability/TraceRecord.ts b/apps/server/src/observability/TraceRecord.ts index 81b7bcbc49..6ea1695a6f 100644 --- a/apps/server/src/observability/TraceRecord.ts +++ b/apps/server/src/observability/TraceRecord.ts @@ -1,7 +1,7 @@ -import { Cause, Exit, Option, Predicate, Tracer } from "effect"; +import { Cause, Exit, Option, Tracer } from "effect"; import { compactTraceAttributes } from "./Attributes.ts"; -import { OtlpTracer } from "effect/unstable/observability"; +import { OtlpResource, OtlpTracer } from "effect/unstable/observability"; interface TraceRecordEvent { readonly name: string; @@ -64,6 +64,11 @@ interface OtlpTraceRecord extends BaseTraceRecord { export type TraceRecord = EffectTraceRecord | OtlpTraceRecord; +type OtlpSpan = OtlpTracer.ScopeSpan["spans"][number]; +type OtlpSpanEvent = OtlpSpan["events"][number]; +type OtlpSpanLink = OtlpSpan["links"][number]; +type OtlpSpanStatus = OtlpSpan["status"]; + interface SerializableSpan { readonly name: string; readonly traceId: string; @@ -142,21 +147,13 @@ export function decodeOtlpTraceRecords( const resourceAttributes = decodeAttributes(resourceSpan.resource?.attributes ?? []); for (const scopeSpan of resourceSpan.scopeSpans) { - const scopeAttributes = decodeScopeAttributes(scopeSpan); - for (const span of scopeSpan.spans) { - const traceId = asNonEmptyString(span.traceId); - const spanId = asNonEmptyString(span.spanId); - if (!traceId || !spanId) { - continue; - } - records.push( otlpSpanToTraceRecord({ resourceAttributes, - scopeAttributes, - scopeName: decodeScopeName(scopeSpan), - scopeVersion: decodeScopeVersion(scopeSpan), + scopeAttributes: {}, + scopeName: scopeSpan.scope.name, + scopeVersion: undefined, span, }), ); @@ -167,150 +164,84 @@ export function decodeOtlpTraceRecords( return records; } -function decodeScopeName(scopeSpan: OtlpTracer.ScopeSpan): string | undefined { - if (!isRecord(scopeSpan.scope)) { - return undefined; - } - - return asNonEmptyString(scopeSpan.scope.name); -} - -function decodeScopeAttributes(scopeSpan: OtlpTracer.ScopeSpan): Readonly> { - if (!isRecord(scopeSpan.scope)) { - return {}; - } - - return decodeAttributes(asArray(scopeSpan.scope.attributes)); -} - -function decodeScopeVersion(scopeSpan: OtlpTracer.ScopeSpan): string | undefined { - if (!isRecord(scopeSpan.scope)) { - return undefined; - } - - return asNonEmptyString(scopeSpan.scope.version); -} - function otlpSpanToTraceRecord(input: { readonly resourceAttributes: Readonly>; readonly scopeAttributes: Readonly>; readonly scopeName: string | undefined; readonly scopeVersion: string | undefined; - readonly span: OtlpTracer.ScopeSpan["spans"][number]; + readonly span: OtlpSpan; }): OtlpTraceRecord { - const startTimeUnixNano = asString(input.span.startTimeUnixNano) ?? "0"; - const endTimeUnixNano = asString(input.span.endTimeUnixNano) ?? startTimeUnixNano; - return { type: "otlp-span", - name: asNonEmptyString(input.span.name) ?? "unknown", - traceId: asNonEmptyString(input.span.traceId) ?? "", - spanId: asNonEmptyString(input.span.spanId) ?? "", - ...(asNonEmptyString(input.span.parentSpanId) - ? { parentSpanId: asNonEmptyString(input.span.parentSpanId)! } - : {}), - sampled: isSampled(decodeSpanFlags(input.span)), + name: input.span.name, + traceId: input.span.traceId, + spanId: input.span.spanId, + ...(input.span.parentSpanId ? { parentSpanId: input.span.parentSpanId } : {}), + sampled: true, kind: normalizeSpanKind(input.span.kind), - startTimeUnixNano, - endTimeUnixNano, - durationMs: Number(parseBigInt(endTimeUnixNano) - parseBigInt(startTimeUnixNano)) / 1_000_000, - attributes: decodeAttributes(input.span.attributes ?? []), + startTimeUnixNano: input.span.startTimeUnixNano, + endTimeUnixNano: input.span.endTimeUnixNano, + durationMs: + Number(parseBigInt(input.span.endTimeUnixNano) - parseBigInt(input.span.startTimeUnixNano)) / + 1_000_000, + attributes: decodeAttributes(input.span.attributes), resourceAttributes: input.resourceAttributes, scope: { ...(input.scopeName ? { name: input.scopeName } : {}), ...(input.scopeVersion ? { version: input.scopeVersion } : {}), attributes: input.scopeAttributes, }, - events: decodeEvents(input.span.events ?? []), - links: decodeLinks(input.span.links ?? []), + events: decodeEvents(input.span.events), + links: decodeLinks(input.span.links), status: decodeStatus(input.span.status), }; } -function decodeSpanFlags(span: OtlpTracer.ScopeSpan["spans"][number]): unknown { - if (!isRecord(span)) { - return undefined; - } - - return span.flags; -} - -function decodeStatus(input: unknown): OtlpTraceRecord["status"] { - if (!isRecord(input)) { - return undefined; - } - - const code = asNonEmptyString(input.code) ?? asString(input.code); - const message = asNonEmptyString(input.message); - if (!code && !message) { - return undefined; - } +function decodeStatus(input: OtlpSpanStatus): OtlpTraceRecord["status"] { + const code = String(input.code); + const message = input.message; return { - ...(code ? { code } : {}), + code, ...(message ? { message } : {}), }; } -function decodeEvents(input: ReadonlyArray): OtlpTraceRecord["events"] { - return input.flatMap((current) => { - if (!isRecord(current)) { - return []; - } - - return [ - { - name: asNonEmptyString(current.name) ?? "event", - timeUnixNano: asString(current.timeUnixNano) ?? "0", - attributes: decodeAttributes(asArray(current.attributes)), - }, - ]; - }); +function decodeEvents(input: ReadonlyArray): ReadonlyArray { + return input.map((current) => ({ + name: current.name, + timeUnixNano: current.timeUnixNano, + attributes: decodeAttributes(current.attributes), + })); } -function decodeLinks(input: ReadonlyArray): OtlpTraceRecord["links"] { +function decodeLinks(input: ReadonlyArray): ReadonlyArray { return input.flatMap((current) => { - if (!isRecord(current)) { - return []; - } - - const traceId = asNonEmptyString(current.traceId); - const spanId = asNonEmptyString(current.spanId); - if (!traceId || !spanId) { - return []; - } - - return [ - { - traceId, - spanId, - attributes: decodeAttributes(asArray(current.attributes)), - }, - ]; + const traceId = current.traceId; + const spanId = current.spanId; + return { + traceId, + spanId, + attributes: decodeAttributes(current.attributes), + }; }); } -function decodeAttributes(input: ReadonlyArray): Readonly> { +function decodeAttributes( + input: ReadonlyArray, +): Readonly> { const entries: Record = {}; for (const attribute of input) { - if (!isKeyValue(attribute)) { - continue; - } - - const key = asNonEmptyString(attribute.key); - if (!key) { - continue; - } - entries[key] = decodeValue(attribute.value); + entries[attribute.key] = decodeValue(attribute.value); } return compactTraceAttributes(entries); } -function decodeValue(input: unknown): unknown { - if (!isRecord(input)) { - return input ?? null; +function decodeValue(input: OtlpResource.AnyValue | null | undefined): unknown { + if (input == null) { + return null; } if ("stringValue" in input) { return input.stringValue; @@ -319,7 +250,7 @@ function decodeValue(input: unknown): unknown { return input.boolValue; } if ("intValue" in input) { - return normalizeInteger(input.intValue); + return input.intValue; } if ("doubleValue" in input) { return input.doubleValue; @@ -327,46 +258,17 @@ function decodeValue(input: unknown): unknown { if ("bytesValue" in input) { return input.bytesValue; } - if (isRecord(input.arrayValue)) { - return asArray(input.arrayValue.values).map((entry) => decodeValue(entry)); + if (input.arrayValue) { + return input.arrayValue.values.map((entry) => decodeValue(entry)); } - if (isRecord(input.kvlistValue)) { - return decodeAttributes(asArray(input.kvlistValue.values)); + if (input.kvlistValue) { + return decodeAttributes(input.kvlistValue.values); } return null; } -function normalizeInteger(input: unknown): number | string { - if (typeof input === "number") { - return input; - } - if (typeof input !== "string") { - return String(input ?? ""); - } - - const parsed = Number(input); - return Number.isSafeInteger(parsed) ? parsed : input; -} - -function normalizeSpanKind(input: unknown): OtlpTraceRecord["kind"] { - if (typeof input === "string" && input.trim().length > 0) { - return input.trim().toLowerCase(); - } - if (typeof input === "number") { - return SPAN_KIND_MAP[input] ?? "internal"; - } - return "internal"; -} - -function isSampled(input: unknown): boolean { - if (typeof input === "number") { - return (input & 0x01) === 0x01; - } - if (typeof input === "string") { - const parsed = Number(input); - return Number.isNaN(parsed) ? true : (parsed & 0x01) === 0x01; - } - return true; +function normalizeSpanKind(input: number): OtlpTraceRecord["kind"] { + return SPAN_KIND_MAP[input] || "internal"; } function parseBigInt(input: string): bigint { @@ -376,29 +278,3 @@ function parseBigInt(input: string): bigint { return 0n; } } - -function asString(input: unknown): string | undefined { - if (typeof input === "string") { - return input; - } - if (typeof input === "number") { - return String(input); - } - return undefined; -} - -function asNonEmptyString(input: unknown): string | undefined { - const value = asString(input)?.trim(); - return value ? value : undefined; -} - -function asArray(input: unknown): ReadonlyArray { - return Array.isArray(input) ? input : []; -} - -const isRecord = Predicate.isObject; - -const isKeyValue = Predicate.compose( - Predicate.isObject, - Predicate.and(Predicate.hasProperty("key"), Predicate.hasProperty("value")), -); From 472dd2620148b8d237b3056889b9be23ca08c784 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 12:23:22 -0700 Subject: [PATCH 4/7] kewl --- apps/server/src/http.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/server/src/http.ts b/apps/server/src/http.ts index c43b1ee835..5b016e604e 100644 --- a/apps/server/src/http.ts +++ b/apps/server/src/http.ts @@ -37,7 +37,14 @@ export const otlpTracesProxyRouteLayer = HttpRouter.add( const httpClient = yield* HttpClient.HttpClient; const bodyJson = cast(yield* request.json); - yield* browserTraceCollector.record(decodeOtlpTraceRecords(bodyJson)); + yield* Effect.try({ + try: () => decodeOtlpTraceRecords(bodyJson), + catch: (cause) => + Effect.logWarning("Failed to decode browser OTLP traces", { + cause, + bodyJson, + }), + }).pipe(Effect.flatMap((records) => browserTraceCollector.record(records))); if (otlpTracesUrl === undefined) { return HttpServerResponse.empty({ status: 204 }); From d0ecdb8f58f7455b4b9069ab8eeac984471a209d Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 12:29:02 -0700 Subject: [PATCH 5/7] fix error --- apps/server/src/http.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/apps/server/src/http.ts b/apps/server/src/http.ts index 5b016e604e..ca4a2c22ef 100644 --- a/apps/server/src/http.ts +++ b/apps/server/src/http.ts @@ -1,5 +1,5 @@ import Mime from "@effect/platform-node/Mime"; -import { Effect, FileSystem, Layer, Option, Path } from "effect"; +import { Data, Effect, FileSystem, Layer, Option, Path } from "effect"; import { cast } from "effect/Function"; import { HttpBody, @@ -26,6 +26,11 @@ const PROJECT_FAVICON_CACHE_CONTROL = "public, max-age=3600"; const FALLBACK_PROJECT_FAVICON_SVG = ``; const OTLP_TRACES_PROXY_PATH = "/api/observability/v1/traces"; +class DecodeOtlpTraceRecordsError extends Data.TaggedError("DecodeOtlpTraceRecordsError")<{ + readonly cause: unknown; + readonly bodyJson: OtlpTracer.TraceData; +}> {} + export const otlpTracesProxyRouteLayer = HttpRouter.add( "POST", OTLP_TRACES_PROXY_PATH, @@ -39,12 +44,16 @@ export const otlpTracesProxyRouteLayer = HttpRouter.add( yield* Effect.try({ try: () => decodeOtlpTraceRecords(bodyJson), - catch: (cause) => + catch: (cause) => new DecodeOtlpTraceRecordsError({ cause, bodyJson }), + }).pipe( + Effect.flatMap((records) => browserTraceCollector.record(records)), + Effect.catch((cause) => Effect.logWarning("Failed to decode browser OTLP traces", { cause, bodyJson, }), - }).pipe(Effect.flatMap((records) => browserTraceCollector.record(records))); + ), + ); if (otlpTracesUrl === undefined) { return HttpServerResponse.empty({ status: 204 }); From 0c7d79bddfe8cb0da30963a1b15361096ea142f2 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 12:30:37 -0700 Subject: [PATCH 6/7] kewl --- apps/server/src/observability/TraceRecord.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/server/src/observability/TraceRecord.ts b/apps/server/src/observability/TraceRecord.ts index 6ea1695a6f..a9a598288b 100644 --- a/apps/server/src/observability/TraceRecord.ts +++ b/apps/server/src/observability/TraceRecord.ts @@ -151,9 +151,16 @@ export function decodeOtlpTraceRecords( records.push( otlpSpanToTraceRecord({ resourceAttributes, - scopeAttributes: {}, + scopeAttributes: decodeAttributes( + "attributes" in scopeSpan.scope && Array.isArray(scopeSpan.scope.attributes) + ? scopeSpan.scope.attributes + : [], + ), scopeName: scopeSpan.scope.name, - scopeVersion: undefined, + scopeVersion: + "version" in scopeSpan.scope && typeof scopeSpan.scope.version === "string" + ? scopeSpan.scope.version + : undefined, span, }), ); From c2e0cd0d0f7cda1fcf4160eeaa0cff4e48cc1613 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 12:46:40 -0700 Subject: [PATCH 7/7] Test browser OTLP span export - Generate a real browser OTLP payload in the server test - Assert exported span metadata matches the server trace record --- apps/server/src/server.test.ts | 200 +++++++++++++++++++++++++-------- 1 file changed, 155 insertions(+), 45 deletions(-) diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index f6534c2ba4..7a23058fc7 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -22,7 +22,7 @@ import { } from "@t3tools/contracts"; import { assert, it } from "@effect/vitest"; import { assertFailure, assertInclude, assertTrue } from "@effect/vitest/utils"; -import { Effect, FileSystem, Layer, Path, Stream } from "effect"; +import { Effect, FileSystem, Layer, ManagedRuntime, Path, Stream } from "effect"; import { FetchHttpClient, HttpBody, @@ -30,6 +30,7 @@ import { HttpRouter, HttpServer, } from "effect/unstable/http"; +import { OtlpSerialization, OtlpTracer } from "effect/unstable/observability"; import { RpcClient, RpcSerialization } from "effect/unstable/rpc"; import { vi } from "vitest"; @@ -135,6 +136,113 @@ const workspaceAndProjectServicesLayer = Layer.mergeAll( ProjectFaviconResolverLive, ); +const browserOtlpTracingLayer = Layer.mergeAll( + FetchHttpClient.layer, + OtlpSerialization.layerJson, + Layer.succeed(HttpClient.TracerDisabledWhen, () => true), +); + +const makeBrowserOtlpPayload = (spanName: string) => + Effect.gen(function* () { + const collector = yield* Effect.acquireRelease( + Effect.promise(async () => { + const NodeHttp = await import("node:http"); + + return await new Promise<{ + readonly close: () => Promise; + readonly firstRequest: Promise<{ + readonly body: string; + readonly contentType: string | null; + }>; + readonly url: string; + }>((resolve, reject) => { + let resolveFirstRequest: + | ((request: { readonly body: string; readonly contentType: string | null }) => void) + | undefined; + const firstRequest = new Promise<{ + readonly body: string; + readonly contentType: string | null; + }>((resolveRequest) => { + resolveFirstRequest = resolveRequest; + }); + + const server = NodeHttp.createServer((request, response) => { + const chunks: Buffer[] = []; + request.on("data", (chunk) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + request.on("end", () => { + resolveFirstRequest?.({ + body: Buffer.concat(chunks).toString("utf8"), + contentType: request.headers["content-type"] ?? null, + }); + resolveFirstRequest = undefined; + response.statusCode = 204; + response.end(); + }); + }); + + server.on("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + if (!address || typeof address === "string") { + reject(new Error("Expected TCP collector address")); + return; + } + + resolve({ + url: `http://127.0.0.1:${address.port}/v1/traces`, + firstRequest, + close: () => + new Promise((resolveClose, rejectClose) => { + server.close((error) => { + if (error) { + rejectClose(error); + return; + } + resolveClose(); + }); + }), + }); + }); + }); + }), + ({ close }) => Effect.promise(close), + ); + + const runtime = ManagedRuntime.make( + OtlpTracer.layer({ + url: collector.url, + exportInterval: "10 millis", + resource: { + serviceName: "t3-web", + attributes: { + "service.runtime": "t3-web", + "service.mode": "browser", + "service.version": "test", + }, + }, + }).pipe(Layer.provide(browserOtlpTracingLayer)), + ); + + try { + yield* Effect.promise(() => runtime.runPromise(Effect.void.pipe(Effect.withSpan(spanName)))); + } finally { + yield* Effect.promise(() => runtime.dispose()); + } + + const request = yield* Effect.promise(() => + Promise.race([ + collector.firstRequest, + new Promise((_, reject) => { + setTimeout(() => reject(new Error("Timed out waiting for OTLP trace export")), 1_000); + }), + ]), + ); + + return JSON.parse(request.body) as OtlpTracer.TraceData; + }); + const buildAppUnderTest = (options?: { config?: Partial; layers?: { @@ -622,7 +730,7 @@ it.layer(NodeServices.layer)("server router seam", (it) => { name: "http.request", timeUnixNano: "1500000", attributes: { - "http.status_code": 200, + "http.status_code": "200", }, }, ], @@ -669,6 +777,18 @@ it.layer(NodeServices.layer)("server router seam", (it) => { () => Effect.gen(function* () { const localTraceRecords: Array = []; + const payload = yield* makeBrowserOtlpPayload("client.test"); + const resourceSpan = payload.resourceSpans[0]; + const scopeSpan = resourceSpan?.scopeSpans[0]; + const span = scopeSpan?.spans[0]; + + assert.notEqual(resourceSpan, undefined); + assert.notEqual(scopeSpan, undefined); + assert.notEqual(span, undefined); + if (!resourceSpan || !scopeSpan || !span) { + return; + } + yield* buildAppUnderTest({ layers: { browserTraceCollector: { @@ -684,52 +804,42 @@ it.layer(NodeServices.layer)("server router seam", (it) => { headers: { "content-type": "application/json", }, - body: HttpBody.text( - JSON.stringify({ - resourceSpans: [ - { - scopeSpans: [ - { - spans: [ - { - traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - spanId: "bbbbbbbbbbbbbbbb", - name: "client.test", - startTimeUnixNano: "1", - endTimeUnixNano: "1", - }, - ], - }, - ], - }, - ], - }), - "application/json", - ), + body: HttpBody.text(JSON.stringify(payload), "application/json"), }); assert.equal(response.status, 204); - assert.deepEqual(localTraceRecords, [ - { - type: "otlp-span", - name: "client.test", - traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - spanId: "bbbbbbbbbbbbbbbb", - sampled: true, - kind: "internal", - startTimeUnixNano: "1", - endTimeUnixNano: "1", - durationMs: 0, - attributes: {}, - resourceAttributes: {}, - scope: { - attributes: {}, - }, - events: [], - links: [], - status: undefined, - }, - ]); + assert.equal(localTraceRecords.length, 1); + const record = localTraceRecords[0] as { + readonly type: string; + readonly name: string; + readonly traceId: string; + readonly spanId: string; + readonly kind: string; + readonly attributes: Readonly>; + readonly events: ReadonlyArray; + readonly links: ReadonlyArray; + readonly scope: { + readonly name?: string; + readonly attributes: Readonly>; + }; + readonly resourceAttributes: Readonly>; + readonly status?: { + readonly code?: string; + }; + }; + + assert.equal(record.type, "otlp-span"); + assert.equal(record.name, span.name); + assert.equal(record.traceId, span.traceId); + assert.equal(record.spanId, span.spanId); + assert.equal(record.kind, "internal"); + assert.deepEqual(record.attributes, {}); + assert.deepEqual(record.events, []); + assert.deepEqual(record.links, []); + assert.equal(record.scope.name, scopeSpan.scope.name); + assert.deepEqual(record.scope.attributes, {}); + assert.equal(record.resourceAttributes["service.name"], "t3-web"); + assert.equal(record.status?.code, String(span.status.code)); }).pipe(Effect.provide(NodeHttpServer.layerTest)), );