diff --git a/apps/server/src/http.ts b/apps/server/src/http.ts index cee08b4f98..ca4a2c22ef 100644 --- a/apps/server/src/http.ts +++ b/apps/server/src/http.ts @@ -1,6 +1,15 @@ import Mime from "@effect/platform-node/Mime"; -import { Effect, FileSystem, Option, Path } from "effect"; -import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"; +import { Data, Effect, FileSystem, Layer, Option, Path } from "effect"; +import { cast } from "effect/Function"; +import { + HttpBody, + HttpClient, + HttpClientResponse, + HttpRouter, + HttpServerResponse, + HttpServerRequest, +} from "effect/unstable/http"; +import { OtlpTracer } from "effect/unstable/observability"; import { ATTACHMENTS_ROUTE_PREFIX, @@ -9,10 +18,74 @@ import { } from "./attachmentPaths"; import { resolveAttachmentPathById } from "./attachmentStore"; import { ServerConfig } from "./config"; +import { decodeOtlpTraceRecords } from "./observability/TraceRecord.ts"; +import { BrowserTraceCollector } from "./observability/Services/BrowserTraceCollector.ts"; import { ProjectFaviconResolver } from "./project/Services/ProjectFaviconResolver"; const PROJECT_FAVICON_CACHE_CONTROL = "public, max-age=3600"; const FALLBACK_PROJECT_FAVICON_SVG = ``; +const OTLP_TRACES_PROXY_PATH = "/api/observability/v1/traces"; + +class DecodeOtlpTraceRecordsError extends Data.TaggedError("DecodeOtlpTraceRecordsError")<{ + readonly cause: unknown; + readonly bodyJson: OtlpTracer.TraceData; +}> {} + +export const otlpTracesProxyRouteLayer = HttpRouter.add( + "POST", + OTLP_TRACES_PROXY_PATH, + Effect.gen(function* () { + const request = yield* HttpServerRequest.HttpServerRequest; + const config = yield* ServerConfig; + const otlpTracesUrl = config.otlpTracesUrl; + const browserTraceCollector = yield* BrowserTraceCollector; + const httpClient = yield* HttpClient.HttpClient; + const bodyJson = cast(yield* request.json); + + yield* Effect.try({ + try: () => decodeOtlpTraceRecords(bodyJson), + catch: (cause) => new DecodeOtlpTraceRecordsError({ cause, bodyJson }), + }).pipe( + Effect.flatMap((records) => browserTraceCollector.record(records)), + Effect.catch((cause) => + Effect.logWarning("Failed to decode browser OTLP traces", { + cause, + bodyJson, + }), + ), + ); + + if (otlpTracesUrl === undefined) { + return HttpServerResponse.empty({ status: 204 }); + } + + return yield* httpClient + .post(otlpTracesUrl, { + body: HttpBody.jsonUnsafe(bodyJson), + }) + .pipe( + Effect.flatMap(HttpClientResponse.filterStatusOk), + Effect.as(HttpServerResponse.empty({ status: 204 })), + Effect.tapError((cause) => + Effect.logWarning("Failed to export browser OTLP traces", { + cause, + otlpTracesUrl, + }), + ), + Effect.catch(() => + Effect.succeed(HttpServerResponse.text("Trace export failed.", { status: 502 })), + ), + ); + }), +).pipe( + Layer.provide( + HttpRouter.cors({ + allowedMethods: ["POST", "OPTIONS"], + allowedHeaders: ["content-type"], + maxAge: 600, + }), + ), +); export const attachmentsRouteLayer = HttpRouter.add( "GET", diff --git a/apps/server/src/observability/Layers/Observability.ts b/apps/server/src/observability/Layers/Observability.ts index 29b9d7eac5..7c6a28005f 100644 --- a/apps/server/src/observability/Layers/Observability.ts +++ b/apps/server/src/observability/Layers/Observability.ts @@ -4,6 +4,8 @@ import { OtlpMetrics, OtlpSerialization, OtlpTracer } from "effect/unstable/obse import { ServerConfig } from "../../config.ts"; import { ServerLoggerLive } from "../../serverLogger.ts"; import { makeLocalFileTracer } from "../LocalFileTracer.ts"; +import { BrowserTraceCollector } from "../Services/BrowserTraceCollector.ts"; +import { makeTraceSink } from "../TraceSink.ts"; const otlpSerializationLayer = OtlpSerialization.layerJson; @@ -16,9 +18,14 @@ export const ObservabilityLive = Layer.unwrap( Layer.succeed(References.TracerTimingEnabled, config.traceTimingEnabled), ); - const tracerLayer = Layer.effect( - Tracer.Tracer, + const tracerLayer = Layer.unwrap( Effect.gen(function* () { + const sink = yield* makeTraceSink({ + filePath: config.serverTracePath, + maxBytes: config.traceMaxBytes, + maxFiles: config.traceMaxFiles, + batchWindowMs: config.traceBatchWindowMs, + }); const delegate = config.otlpTracesUrl === undefined ? undefined @@ -34,13 +41,26 @@ export const ObservabilityLive = Layer.unwrap( }, }); - return yield* makeLocalFileTracer({ + const tracer = yield* makeLocalFileTracer({ filePath: config.serverTracePath, maxBytes: config.traceMaxBytes, maxFiles: config.traceMaxFiles, batchWindowMs: config.traceBatchWindowMs, + sink, ...(delegate ? { delegate } : {}), }); + + return Layer.mergeAll( + Layer.succeed(Tracer.Tracer, tracer), + Layer.succeed(BrowserTraceCollector, { + record: (records) => + Effect.sync(() => { + for (const record of records) { + sink.push(record); + } + }), + }), + ); }), ).pipe(Layer.provideMerge(otlpSerializationLayer)); diff --git a/apps/server/src/observability/LocalFileTracer.test.ts b/apps/server/src/observability/LocalFileTracer.test.ts index 1bc5a34305..19efffaf10 100644 --- a/apps/server/src/observability/LocalFileTracer.test.ts +++ b/apps/server/src/observability/LocalFileTracer.test.ts @@ -5,7 +5,7 @@ import path from "node:path"; import { assert, describe, it } from "@effect/vitest"; import { Effect, Layer, Logger, References, Tracer } from "effect"; -import type { TraceRecord } from "./TraceRecord.ts"; +import type { EffectTraceRecord } from "./TraceRecord.ts"; import { makeLocalFileTracer } from "./LocalFileTracer.ts"; const makeTestLayer = (tracePath: string) => @@ -23,13 +23,13 @@ const makeTestLayer = (tracePath: string) => Layer.succeed(References.MinimumLogLevel, "Info"), ); -const readTraceRecords = (tracePath: string): Array => +const readTraceRecords = (tracePath: string): Array => fs .readFileSync(tracePath, "utf8") .trim() .split("\n") .filter((line) => line.length > 0) - .map((line) => JSON.parse(line) as TraceRecord); + .map((line) => JSON.parse(line) as EffectTraceRecord); describe("LocalFileTracer", () => { it.effect("writes nested spans to disk and captures log messages as span events", () => diff --git a/apps/server/src/observability/LocalFileTracer.ts b/apps/server/src/observability/LocalFileTracer.ts index 7f75eb0bf9..cde5a176e8 100644 --- a/apps/server/src/observability/LocalFileTracer.ts +++ b/apps/server/src/observability/LocalFileTracer.ts @@ -1,8 +1,8 @@ import type * as Exit from "effect/Exit"; import { Effect, Option, Tracer } from "effect"; -import { spanToTraceRecord } from "./TraceRecord.ts"; -import { makeTraceSink } from "./TraceSink.ts"; +import { EffectTraceRecord, spanToTraceRecord } from "./TraceRecord.ts"; +import { makeTraceSink, type TraceSink } from "./TraceSink.ts"; export interface LocalFileTracerOptions { readonly filePath: string; @@ -10,6 +10,7 @@ export interface LocalFileTracerOptions { readonly maxFiles: number; readonly batchWindowMs: number; readonly delegate?: Tracer.Tracer; + readonly sink?: TraceSink; } class LocalFileSpan implements Tracer.Span { @@ -30,7 +31,7 @@ class LocalFileSpan implements Tracer.Span { constructor( options: Parameters[0], private readonly delegate: Tracer.Span, - private readonly push: (record: ReturnType) => void, + private readonly push: (record: EffectTraceRecord) => void, ) { this.name = delegate.name; this.spanId = delegate.spanId; @@ -82,12 +83,14 @@ class LocalFileSpan implements Tracer.Span { export const makeLocalFileTracer = Effect.fn("makeLocalFileTracer")(function* ( options: LocalFileTracerOptions, ) { - const sink = yield* makeTraceSink({ - filePath: options.filePath, - maxBytes: options.maxBytes, - maxFiles: options.maxFiles, - batchWindowMs: options.batchWindowMs, - }); + const sink = + options.sink ?? + (yield* makeTraceSink({ + filePath: options.filePath, + maxBytes: options.maxBytes, + maxFiles: options.maxFiles, + batchWindowMs: options.batchWindowMs, + })); const delegate = options.delegate ?? diff --git a/apps/server/src/observability/Services/BrowserTraceCollector.ts b/apps/server/src/observability/Services/BrowserTraceCollector.ts new file mode 100644 index 0000000000..60e8f5fa13 --- /dev/null +++ b/apps/server/src/observability/Services/BrowserTraceCollector.ts @@ -0,0 +1,13 @@ +import { ServiceMap } from "effect"; +import type { Effect } from "effect"; + +import type { TraceRecord } from "../TraceRecord.ts"; + +export interface BrowserTraceCollectorShape { + readonly record: (records: ReadonlyArray) => Effect.Effect; +} + +export class BrowserTraceCollector extends ServiceMap.Service< + BrowserTraceCollector, + BrowserTraceCollectorShape +>()("t3/observability/Services/BrowserTraceCollector") {} diff --git a/apps/server/src/observability/TraceRecord.ts b/apps/server/src/observability/TraceRecord.ts index 3aee1b267c..a9a598288b 100644 --- a/apps/server/src/observability/TraceRecord.ts +++ b/apps/server/src/observability/TraceRecord.ts @@ -1,29 +1,37 @@ import { Cause, Exit, Option, Tracer } from "effect"; import { compactTraceAttributes } from "./Attributes.ts"; +import { OtlpResource, OtlpTracer } from "effect/unstable/observability"; -export interface TraceRecord { - readonly type: "effect-span"; +interface TraceRecordEvent { + readonly name: string; + readonly timeUnixNano: string; + readonly attributes: Readonly>; +} + +interface TraceRecordLink { + readonly traceId: string; + readonly spanId: string; + readonly attributes: Readonly>; +} + +interface BaseTraceRecord { readonly name: string; + readonly kind: string; readonly traceId: string; readonly spanId: string; readonly parentSpanId?: string; readonly sampled: boolean; - readonly kind: Tracer.SpanKind; readonly startTimeUnixNano: string; readonly endTimeUnixNano: string; readonly durationMs: number; readonly attributes: Readonly>; - readonly events: ReadonlyArray<{ - readonly name: string; - readonly timeUnixNano: string; - readonly attributes: Readonly>; - }>; - readonly links: ReadonlyArray<{ - readonly traceId: string; - readonly spanId: string; - readonly attributes: Readonly>; - }>; + readonly events: ReadonlyArray; + readonly links: ReadonlyArray; +} + +export interface EffectTraceRecord extends BaseTraceRecord { + readonly type: "effect-span"; readonly exit: | { readonly _tag: "Success"; @@ -38,6 +46,29 @@ export interface TraceRecord { }; } +interface OtlpTraceRecord extends BaseTraceRecord { + readonly type: "otlp-span"; + readonly resourceAttributes: Readonly>; + readonly scope: Readonly<{ + readonly name?: string; + readonly version?: string; + readonly attributes: Readonly>; + }>; + readonly status?: + | { + readonly code?: string; + readonly message?: string; + } + | undefined; +} + +export type TraceRecord = EffectTraceRecord | OtlpTraceRecord; + +type OtlpSpan = OtlpTracer.ScopeSpan["spans"][number]; +type OtlpSpanEvent = OtlpSpan["events"][number]; +type OtlpSpanLink = OtlpSpan["links"][number]; +type OtlpSpanStatus = OtlpSpan["status"]; + interface SerializableSpan { readonly name: string; readonly traceId: string; @@ -53,7 +84,7 @@ interface SerializableSpan { >; } -function formatTraceExit(exit: Exit.Exit): TraceRecord["exit"] { +function formatTraceExit(exit: Exit.Exit): EffectTraceRecord["exit"] { if (Exit.isSuccess(exit)) { return { _tag: "Success" }; } @@ -69,7 +100,7 @@ function formatTraceExit(exit: Exit.Exit): TraceRecord["exit"] }; } -export function spanToTraceRecord(span: SerializableSpan): TraceRecord { +export function spanToTraceRecord(span: SerializableSpan): EffectTraceRecord { const status = span.status as Extract; const parentSpanId = Option.getOrUndefined(span.parent)?.spanId; @@ -98,3 +129,159 @@ export function spanToTraceRecord(span: SerializableSpan): TraceRecord { exit: formatTraceExit(status.exit), }; } + +const SPAN_KIND_MAP: Record = { + 1: "internal", + 2: "server", + 3: "client", + 4: "producer", + 5: "consumer", +}; + +export function decodeOtlpTraceRecords( + payload: OtlpTracer.TraceData, +): ReadonlyArray { + const records: Array = []; + + for (const resourceSpan of payload.resourceSpans) { + const resourceAttributes = decodeAttributes(resourceSpan.resource?.attributes ?? []); + + for (const scopeSpan of resourceSpan.scopeSpans) { + for (const span of scopeSpan.spans) { + records.push( + otlpSpanToTraceRecord({ + resourceAttributes, + scopeAttributes: decodeAttributes( + "attributes" in scopeSpan.scope && Array.isArray(scopeSpan.scope.attributes) + ? scopeSpan.scope.attributes + : [], + ), + scopeName: scopeSpan.scope.name, + scopeVersion: + "version" in scopeSpan.scope && typeof scopeSpan.scope.version === "string" + ? scopeSpan.scope.version + : undefined, + span, + }), + ); + } + } + } + + return records; +} + +function otlpSpanToTraceRecord(input: { + readonly resourceAttributes: Readonly>; + readonly scopeAttributes: Readonly>; + readonly scopeName: string | undefined; + readonly scopeVersion: string | undefined; + readonly span: OtlpSpan; +}): OtlpTraceRecord { + return { + type: "otlp-span", + name: input.span.name, + traceId: input.span.traceId, + spanId: input.span.spanId, + ...(input.span.parentSpanId ? { parentSpanId: input.span.parentSpanId } : {}), + sampled: true, + kind: normalizeSpanKind(input.span.kind), + startTimeUnixNano: input.span.startTimeUnixNano, + endTimeUnixNano: input.span.endTimeUnixNano, + durationMs: + Number(parseBigInt(input.span.endTimeUnixNano) - parseBigInt(input.span.startTimeUnixNano)) / + 1_000_000, + attributes: decodeAttributes(input.span.attributes), + resourceAttributes: input.resourceAttributes, + scope: { + ...(input.scopeName ? { name: input.scopeName } : {}), + ...(input.scopeVersion ? { version: input.scopeVersion } : {}), + attributes: input.scopeAttributes, + }, + events: decodeEvents(input.span.events), + links: decodeLinks(input.span.links), + status: decodeStatus(input.span.status), + }; +} + +function decodeStatus(input: OtlpSpanStatus): OtlpTraceRecord["status"] { + const code = String(input.code); + const message = input.message; + + return { + code, + ...(message ? { message } : {}), + }; +} + +function decodeEvents(input: ReadonlyArray): ReadonlyArray { + return input.map((current) => ({ + name: current.name, + timeUnixNano: current.timeUnixNano, + attributes: decodeAttributes(current.attributes), + })); +} + +function decodeLinks(input: ReadonlyArray): ReadonlyArray { + return input.flatMap((current) => { + const traceId = current.traceId; + const spanId = current.spanId; + return { + traceId, + spanId, + attributes: decodeAttributes(current.attributes), + }; + }); +} + +function decodeAttributes( + input: ReadonlyArray, +): Readonly> { + const entries: Record = {}; + + for (const attribute of input) { + entries[attribute.key] = decodeValue(attribute.value); + } + + return compactTraceAttributes(entries); +} + +function decodeValue(input: OtlpResource.AnyValue | null | undefined): unknown { + if (input == null) { + return null; + } + if ("stringValue" in input) { + return input.stringValue; + } + if ("boolValue" in input) { + return input.boolValue; + } + if ("intValue" in input) { + return input.intValue; + } + if ("doubleValue" in input) { + return input.doubleValue; + } + if ("bytesValue" in input) { + return input.bytesValue; + } + if (input.arrayValue) { + return input.arrayValue.values.map((entry) => decodeValue(entry)); + } + if (input.kvlistValue) { + return decodeAttributes(input.kvlistValue.values); + } + return null; +} + +function normalizeSpanKind(input: number): OtlpTraceRecord["kind"] { + return SPAN_KIND_MAP[input] || "internal"; +} + +function parseBigInt(input: string): bigint { + try { + return BigInt(input); + } catch { + return 0n; + } +} diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 3d8e36e0a7..7a23058fc7 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -1,6 +1,7 @@ import * as NodeHttpServer from "@effect/platform-node/NodeHttpServer"; import * as NodeSocket from "@effect/platform-node/NodeSocket"; import * as NodeServices from "@effect/platform-node/NodeServices"; + import { CommandId, DEFAULT_SERVER_SETTINGS, @@ -21,8 +22,15 @@ import { } from "@t3tools/contracts"; import { assert, it } from "@effect/vitest"; import { assertFailure, assertInclude, assertTrue } from "@effect/vitest/utils"; -import { Effect, FileSystem, Layer, Path, Stream } from "effect"; -import { HttpClient, HttpRouter, HttpServer } from "effect/unstable/http"; +import { Effect, FileSystem, Layer, ManagedRuntime, Path, Stream } from "effect"; +import { + FetchHttpClient, + HttpBody, + HttpClient, + HttpRouter, + HttpServer, +} from "effect/unstable/http"; +import { OtlpSerialization, OtlpTracer } from "effect/unstable/observability"; import { RpcClient, RpcSerialization } from "effect/unstable/rpc"; import { vi } from "vitest"; @@ -56,6 +64,10 @@ import { ServerLifecycleEvents, type ServerLifecycleEventsShape } from "./server import { ServerRuntimeStartup, type ServerRuntimeStartupShape } from "./serverRuntimeStartup.ts"; import { ServerSettingsService, type ServerSettingsShape } from "./serverSettings.ts"; import { TerminalManager, type TerminalManagerShape } from "./terminal/Services/Manager.ts"; +import { + BrowserTraceCollector, + type BrowserTraceCollectorShape, +} from "./observability/Services/BrowserTraceCollector.ts"; import { ProjectFaviconResolverLive } from "./project/Layers/ProjectFaviconResolver.ts"; import { ProjectSetupScriptRunner, @@ -124,6 +136,113 @@ const workspaceAndProjectServicesLayer = Layer.mergeAll( ProjectFaviconResolverLive, ); +const browserOtlpTracingLayer = Layer.mergeAll( + FetchHttpClient.layer, + OtlpSerialization.layerJson, + Layer.succeed(HttpClient.TracerDisabledWhen, () => true), +); + +const makeBrowserOtlpPayload = (spanName: string) => + Effect.gen(function* () { + const collector = yield* Effect.acquireRelease( + Effect.promise(async () => { + const NodeHttp = await import("node:http"); + + return await new Promise<{ + readonly close: () => Promise; + readonly firstRequest: Promise<{ + readonly body: string; + readonly contentType: string | null; + }>; + readonly url: string; + }>((resolve, reject) => { + let resolveFirstRequest: + | ((request: { readonly body: string; readonly contentType: string | null }) => void) + | undefined; + const firstRequest = new Promise<{ + readonly body: string; + readonly contentType: string | null; + }>((resolveRequest) => { + resolveFirstRequest = resolveRequest; + }); + + const server = NodeHttp.createServer((request, response) => { + const chunks: Buffer[] = []; + request.on("data", (chunk) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + request.on("end", () => { + resolveFirstRequest?.({ + body: Buffer.concat(chunks).toString("utf8"), + contentType: request.headers["content-type"] ?? null, + }); + resolveFirstRequest = undefined; + response.statusCode = 204; + response.end(); + }); + }); + + server.on("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + if (!address || typeof address === "string") { + reject(new Error("Expected TCP collector address")); + return; + } + + resolve({ + url: `http://127.0.0.1:${address.port}/v1/traces`, + firstRequest, + close: () => + new Promise((resolveClose, rejectClose) => { + server.close((error) => { + if (error) { + rejectClose(error); + return; + } + resolveClose(); + }); + }), + }); + }); + }); + }), + ({ close }) => Effect.promise(close), + ); + + const runtime = ManagedRuntime.make( + OtlpTracer.layer({ + url: collector.url, + exportInterval: "10 millis", + resource: { + serviceName: "t3-web", + attributes: { + "service.runtime": "t3-web", + "service.mode": "browser", + "service.version": "test", + }, + }, + }).pipe(Layer.provide(browserOtlpTracingLayer)), + ); + + try { + yield* Effect.promise(() => runtime.runPromise(Effect.void.pipe(Effect.withSpan(spanName)))); + } finally { + yield* Effect.promise(() => runtime.dispose()); + } + + const request = yield* Effect.promise(() => + Promise.race([ + collector.firstRequest, + new Promise((_, reject) => { + setTimeout(() => reject(new Error("Timed out waiting for OTLP trace export")), 1_000); + }), + ]), + ); + + return JSON.parse(request.body) as OtlpTracer.TraceData; + }); + const buildAppUnderTest = (options?: { config?: Partial; layers?: { @@ -138,6 +257,7 @@ const buildAppUnderTest = (options?: { orchestrationEngine?: Partial; projectionSnapshotQuery?: Partial; checkpointDiffQuery?: Partial; + browserTraceCollector?: Partial; serverLifecycleEvents?: Partial; serverRuntimeStartup?: Partial; }; @@ -263,6 +383,12 @@ const buildAppUnderTest = (options?: { ...options?.layers?.checkpointDiffQuery, }), ), + Layer.provide( + Layer.mock(BrowserTraceCollector)({ + record: () => Effect.void, + ...options?.layers?.browserTraceCollector, + }), + ), Layer.provide( Layer.mock(ServerLifecycleEvents)({ publish: (event) => Effect.succeed({ ...(event as any), sequence: 1 }), @@ -280,6 +406,7 @@ const buildAppUnderTest = (options?: { }), ), Layer.provide(workspaceAndProjectServicesLayer), + Layer.provideMerge(FetchHttpClient.layer), Layer.provide(layerConfig), ); @@ -437,6 +564,285 @@ it.layer(NodeServices.layer)("server router seam", (it) => { }).pipe(Effect.provide(NodeHttpServer.layerTest)), ); + it.effect("proxies browser OTLP trace exports through the server", () => + Effect.gen(function* () { + const upstreamRequests: Array<{ + readonly body: string; + readonly contentType: string | null; + }> = []; + const localTraceRecords: Array = []; + const payload = { + resourceSpans: [ + { + resource: { + attributes: [ + { + key: "service.name", + value: { stringValue: "t3-web" }, + }, + ], + }, + scopeSpans: [ + { + scope: { + name: "effect", + version: "4.0.0-beta.43", + }, + spans: [ + { + traceId: "11111111111111111111111111111111", + spanId: "2222222222222222", + parentSpanId: "3333333333333333", + name: "RpcClient.server.getSettings", + kind: 3, + startTimeUnixNano: "1000000", + endTimeUnixNano: "2000000", + attributes: [ + { + key: "rpc.method", + value: { stringValue: "server.getSettings" }, + }, + ], + events: [ + { + name: "http.request", + timeUnixNano: "1500000", + attributes: [ + { + key: "http.status_code", + value: { intValue: "200" }, + }, + ], + }, + ], + links: [], + status: { + code: "STATUS_CODE_OK", + }, + flags: 1, + }, + ], + }, + ], + }, + ], + }; + + const collector = yield* Effect.acquireRelease( + Effect.promise(async () => { + const NodeHttp = await import("node:http"); + + return await new Promise<{ + readonly close: () => Promise; + readonly url: string; + }>((resolve, reject) => { + const server = NodeHttp.createServer((request, response) => { + const chunks: Buffer[] = []; + request.on("data", (chunk) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + request.on("end", () => { + upstreamRequests.push({ + body: Buffer.concat(chunks).toString("utf8"), + contentType: request.headers["content-type"] ?? null, + }); + response.statusCode = 204; + response.end(); + }); + }); + + server.on("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + if (!address || typeof address === "string") { + reject(new Error("Expected TCP collector address")); + return; + } + + resolve({ + url: `http://127.0.0.1:${address.port}/v1/traces`, + close: () => + new Promise((resolveClose, rejectClose) => { + server.close((error) => { + if (error) { + rejectClose(error); + return; + } + resolveClose(); + }); + }), + }); + }); + }); + }), + ({ close }) => Effect.promise(close), + ); + + yield* buildAppUnderTest({ + config: { + otlpTracesUrl: collector.url, + }, + layers: { + browserTraceCollector: { + record: (records) => + Effect.sync(() => { + localTraceRecords.push(...records); + }), + }, + }, + }); + + const response = yield* HttpClient.post("/api/observability/v1/traces", { + headers: { + "content-type": "application/json", + origin: "http://localhost:5733", + }, + body: HttpBody.text(JSON.stringify(payload), "application/json"), + }); + + assert.equal(response.status, 204); + assert.equal(response.headers["access-control-allow-origin"], "*"); + assert.deepEqual(localTraceRecords, [ + { + type: "otlp-span", + name: "RpcClient.server.getSettings", + traceId: "11111111111111111111111111111111", + spanId: "2222222222222222", + parentSpanId: "3333333333333333", + sampled: true, + kind: "client", + startTimeUnixNano: "1000000", + endTimeUnixNano: "2000000", + durationMs: 1, + attributes: { + "rpc.method": "server.getSettings", + }, + resourceAttributes: { + "service.name": "t3-web", + }, + scope: { + name: "effect", + version: "4.0.0-beta.43", + attributes: {}, + }, + events: [ + { + name: "http.request", + timeUnixNano: "1500000", + attributes: { + "http.status_code": "200", + }, + }, + ], + links: [], + status: { + code: "STATUS_CODE_OK", + }, + }, + ]); + assert.deepEqual(upstreamRequests, [ + { + body: JSON.stringify(payload), + contentType: "application/json", + }, + ]); + }).pipe(Effect.provide(NodeHttpServer.layerTest)), + ); + + it.effect("responds to browser OTLP trace preflight requests with CORS headers", () => + Effect.gen(function* () { + yield* buildAppUnderTest(); + + const url = yield* getHttpServerUrl("/api/observability/v1/traces"); + const response = yield* Effect.promise(() => + fetch(url, { + method: "OPTIONS", + headers: { + origin: "http://localhost:5733", + "access-control-request-method": "POST", + "access-control-request-headers": "content-type", + }, + }), + ); + + assert.equal(response.status, 204); + assert.equal(response.headers.get("access-control-allow-origin"), "*"); + assert.equal(response.headers.get("access-control-allow-methods"), "POST, OPTIONS"); + assert.equal(response.headers.get("access-control-allow-headers"), "content-type"); + }).pipe(Effect.provide(NodeHttpServer.layerTest)), + ); + + it.effect( + "stores browser OTLP trace exports locally when no upstream collector is configured", + () => + Effect.gen(function* () { + const localTraceRecords: Array = []; + const payload = yield* makeBrowserOtlpPayload("client.test"); + const resourceSpan = payload.resourceSpans[0]; + const scopeSpan = resourceSpan?.scopeSpans[0]; + const span = scopeSpan?.spans[0]; + + assert.notEqual(resourceSpan, undefined); + assert.notEqual(scopeSpan, undefined); + assert.notEqual(span, undefined); + if (!resourceSpan || !scopeSpan || !span) { + return; + } + + yield* buildAppUnderTest({ + layers: { + browserTraceCollector: { + record: (records) => + Effect.sync(() => { + localTraceRecords.push(...records); + }), + }, + }, + }); + + const response = yield* HttpClient.post("/api/observability/v1/traces", { + headers: { + "content-type": "application/json", + }, + body: HttpBody.text(JSON.stringify(payload), "application/json"), + }); + + assert.equal(response.status, 204); + assert.equal(localTraceRecords.length, 1); + const record = localTraceRecords[0] as { + readonly type: string; + readonly name: string; + readonly traceId: string; + readonly spanId: string; + readonly kind: string; + readonly attributes: Readonly>; + readonly events: ReadonlyArray; + readonly links: ReadonlyArray; + readonly scope: { + readonly name?: string; + readonly attributes: Readonly>; + }; + readonly resourceAttributes: Readonly>; + readonly status?: { + readonly code?: string; + }; + }; + + assert.equal(record.type, "otlp-span"); + assert.equal(record.name, span.name); + assert.equal(record.traceId, span.traceId); + assert.equal(record.spanId, span.spanId); + assert.equal(record.kind, "internal"); + assert.deepEqual(record.attributes, {}); + assert.deepEqual(record.events, []); + assert.deepEqual(record.links, []); + assert.equal(record.scope.name, scopeSpan.scope.name); + assert.deepEqual(record.scope.attributes, {}); + assert.equal(record.resourceAttributes["service.name"], "t3-web"); + assert.equal(record.status?.code, String(span.status.code)); + }).pipe(Effect.provide(NodeHttpServer.layerTest)), + ); + it.effect("returns 404 for missing attachment id lookups", () => Effect.gen(function* () { yield* buildAppUnderTest(); diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index 04ffaeeeeb..f56edde6fa 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -2,7 +2,12 @@ import { Effect, Layer } from "effect"; import { FetchHttpClient, HttpRouter, HttpServer } from "effect/unstable/http"; import { ServerConfig } from "./config"; -import { attachmentsRouteLayer, projectFaviconRouteLayer, staticAndDevRouteLayer } from "./http"; +import { + attachmentsRouteLayer, + otlpTracesProxyRouteLayer, + projectFaviconRouteLayer, + staticAndDevRouteLayer, +} from "./http"; import { fixPath } from "./os-jank"; import { websocketRpcRouteLayer } from "./ws"; import { OpenLive } from "./open"; @@ -205,6 +210,7 @@ const RuntimeServicesLive = ServerRuntimeStartupLive.pipe( export const makeRoutesLayer = Layer.mergeAll( attachmentsRouteLayer, + otlpTracesProxyRouteLayer, projectFaviconRouteLayer, staticAndDevRouteLayer, websocketRpcRouteLayer, diff --git a/apps/web/src/observability/clientTracing.ts b/apps/web/src/observability/clientTracing.ts new file mode 100644 index 0000000000..aa9d50c114 --- /dev/null +++ b/apps/web/src/observability/clientTracing.ts @@ -0,0 +1,147 @@ +import { Exit, Layer, ManagedRuntime, Scope, Tracer } from "effect"; +import { FetchHttpClient, HttpClient } from "effect/unstable/http"; +import { OtlpSerialization, OtlpTracer } from "effect/unstable/observability"; + +import { isElectron } from "../env"; +import { resolveServerUrl } from "../lib/utils"; +import { APP_VERSION } from "~/branding"; + +const DEFAULT_EXPORT_INTERVAL_MS = 1_000; +const CLIENT_TRACING_RESOURCE = { + serviceName: "t3-web", + attributes: { + "service.runtime": "t3-web", + "service.mode": isElectron ? "electron" : "browser", + "service.version": APP_VERSION, + }, +} as const; + +const delegateRuntimeLayer = Layer.mergeAll( + FetchHttpClient.layer, + OtlpSerialization.layerJson, + Layer.succeed(HttpClient.TracerDisabledWhen, () => true), +); + +let activeDelegate: Tracer.Tracer | null = null; +let activeRuntime: ManagedRuntime.ManagedRuntime | null = null; +let activeScope: Scope.Closeable | null = null; +let activeConfigKey: string | null = null; +let configurationGeneration = 0; +let pendingConfiguration = Promise.resolve(); + +export interface ClientTracingConfig { + readonly exportIntervalMs?: number; +} + +export const ClientTracingLive = Layer.succeed( + Tracer.Tracer, + Tracer.make({ + span(options) { + return activeDelegate?.span(options) ?? new Tracer.NativeSpan(options); + }, + }), +); + +export function configureClientTracing(config: ClientTracingConfig = {}): Promise { + if (config.exportIntervalMs === undefined && activeConfigKey !== null) { + return pendingConfiguration; + } + pendingConfiguration = pendingConfiguration.finally(() => applyClientTracingConfig(config)); + return pendingConfiguration; +} + +async function applyClientTracingConfig(config: ClientTracingConfig): Promise { + const otlpTracesUrl = resolveServerUrl({ + protocol: window.location.protocol === "https:" ? "https" : "http", + pathname: "/api/observability/v1/traces", + }); + const exportIntervalMs = Math.max(10, config.exportIntervalMs ?? DEFAULT_EXPORT_INTERVAL_MS); + const nextConfigKey = `${otlpTracesUrl}|${exportIntervalMs}`; + + if (activeConfigKey === nextConfigKey && activeDelegate !== null) { + return; + } + + activeConfigKey = nextConfigKey; + const generation = ++configurationGeneration; + + const previousRuntime = activeRuntime; + const previousScope = activeScope; + + activeDelegate = null; + activeRuntime = null; + activeScope = null; + + await disposeTracerRuntime(previousRuntime, previousScope); + + const runtime = ManagedRuntime.make(delegateRuntimeLayer); + const scope = runtime.runSync(Scope.make()); + + try { + const delegate = await runtime.runPromise( + Scope.provide(scope)( + OtlpTracer.make({ + url: otlpTracesUrl, + exportInterval: `${exportIntervalMs} millis`, + resource: CLIENT_TRACING_RESOURCE, + }), + ), + ); + + if (generation !== configurationGeneration) { + await disposeTracerRuntime(runtime, scope); + return; + } + + activeDelegate = delegate; + activeRuntime = runtime; + activeScope = scope; + } catch (error) { + await disposeTracerRuntime(runtime, scope); + + if (generation === configurationGeneration) { + console.warn("Failed to configure client tracing exporter", { + error: formatError(error), + otlpTracesUrl, + }); + } + } +} + +async function disposeTracerRuntime( + runtime: ManagedRuntime.ManagedRuntime | null, + scope: Scope.Closeable | null, +): Promise { + if (runtime === null || scope === null) { + return; + } + + await runtime + .runPromise(Scope.close(scope, Exit.void)) + .catch(() => undefined) + .finally(() => { + runtime.dispose(); + }); +} + +function formatError(error: unknown): string { + if (error instanceof Error && error.message.trim().length > 0) { + return error.message; + } + + return String(error); +} + +export async function __resetClientTracingForTests() { + configurationGeneration++; + activeConfigKey = null; + activeDelegate = null; + pendingConfiguration = Promise.resolve(); + + const runtime = activeRuntime; + const scope = activeScope; + activeRuntime = null; + activeScope = null; + + await disposeTracerRuntime(runtime, scope); +} diff --git a/apps/web/src/rpc/client.test.ts b/apps/web/src/rpc/client.test.ts index c19b85dd36..8e3e1a281d 100644 --- a/apps/web/src/rpc/client.test.ts +++ b/apps/web/src/rpc/client.test.ts @@ -2,6 +2,10 @@ import { DEFAULT_SERVER_SETTINGS, WS_METHODS } from "@t3tools/contracts"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { AsyncResult, AtomRegistry } from "effect/unstable/reactivity"; +import { + configureClientTracing, + __resetClientTracingForTests, +} from "../observability/clientTracing"; import { __resetWsRpcAtomClientForTests, runRpc, WsRpcAtomClient } from "./client"; type WsEventType = "open" | "message" | "close" | "error"; @@ -66,6 +70,7 @@ class MockWebSocket { } const originalWebSocket = globalThis.WebSocket; +const originalFetch = globalThis.fetch; function getSocket(): MockWebSocket { const socket = sockets.at(-1); @@ -112,6 +117,7 @@ beforeEach(() => { afterEach(() => { __resetWsRpcAtomClientForTests(); globalThis.WebSocket = originalWebSocket; + globalThis.fetch = originalFetch; vi.restoreAllMocks(); }); @@ -228,4 +234,46 @@ describe("WsRpcAtomClient", () => { release(); registry.dispose(); }); + + it("attaches distributed trace ids when client OTLP tracing is enabled", async () => { + await configureClientTracing({ + exportIntervalMs: 10, + }); + + const requestPromise = runRpc((client) => client(WS_METHODS.serverGetSettings, {})); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + + await waitFor(() => { + expect(socket.sent).toHaveLength(1); + }); + + const requestMessage = JSON.parse(socket.sent[0] ?? "{}") as { + id: string; + spanId?: string; + tag: string; + traceId?: string; + }; + expect(requestMessage.tag).toBe(WS_METHODS.serverGetSettings); + expect(requestMessage.traceId).toMatch(/^[0-9a-f]{32}$/); + expect(requestMessage.spanId).toMatch(/^[0-9a-f]{16}$/); + + socket.serverMessage( + JSON.stringify({ + _tag: "Exit", + requestId: requestMessage.id, + exit: { + _tag: "Success", + value: DEFAULT_SERVER_SETTINGS, + }, + }), + ); + + await expect(requestPromise).resolves.toEqual(DEFAULT_SERVER_SETTINGS); + }); }); diff --git a/apps/web/src/rpc/client.ts b/apps/web/src/rpc/client.ts index 117f7c0aaf..e4f9a6bbdd 100644 --- a/apps/web/src/rpc/client.ts +++ b/apps/web/src/rpc/client.ts @@ -2,6 +2,11 @@ import { WsRpcGroup } from "@t3tools/contracts"; import { Effect, Layer, ManagedRuntime } from "effect"; import { AtomRpc } from "effect/unstable/reactivity"; +import { + __resetClientTracingForTests, + ClientTracingLive, + configureClientTracing, +} from "../observability/clientTracing"; import { createWsRpcProtocolLayer } from "./protocol"; export class WsRpcAtomClient extends AtomRpc.Service()("WsRpcAtomClient", { @@ -16,18 +21,22 @@ function getRuntime() { return sharedRuntime; } - sharedRuntime = ManagedRuntime.make(WsRpcAtomClient.layer); + sharedRuntime = ManagedRuntime.make(Layer.mergeAll(WsRpcAtomClient.layer, ClientTracingLive)); return sharedRuntime; } export function runRpc( execute: (client: typeof WsRpcAtomClient.Service) => Effect.Effect, ): Promise { - return getRuntime().runPromise(WsRpcAtomClient.use(execute)); + return configureClientTracing().then(() => { + const runtime = getRuntime(); + return runtime.runPromise(WsRpcAtomClient.use(execute)); + }); } export async function __resetWsRpcAtomClientForTests() { const runtime = sharedRuntime; sharedRuntime = null; await runtime?.dispose(); + await __resetClientTracingForTests(); } diff --git a/apps/web/src/wsTransport.test.ts b/apps/web/src/wsTransport.test.ts index 2f64f2a692..6f4a5d1943 100644 --- a/apps/web/src/wsTransport.test.ts +++ b/apps/web/src/wsTransport.test.ts @@ -1,6 +1,10 @@ -import { WS_METHODS } from "@t3tools/contracts"; +import { DEFAULT_SERVER_SETTINGS, WS_METHODS } from "@t3tools/contracts"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + __resetClientTracingForTests, + configureClientTracing, +} from "./observability/clientTracing"; import { WsTransport } from "./wsTransport"; type WsEventType = "open" | "message" | "close" | "error"; @@ -63,6 +67,7 @@ class MockWebSocket { } const originalWebSocket = globalThis.WebSocket; +const originalFetch = globalThis.fetch; function getSocket(): MockWebSocket { const socket = sockets.at(-1); @@ -108,6 +113,8 @@ beforeEach(() => { afterEach(() => { globalThis.WebSocket = originalWebSocket; + globalThis.fetch = originalFetch; + void __resetClientTracingForTests(); vi.restoreAllMocks(); }); @@ -440,4 +447,46 @@ describe("WsTransport", () => { expect(callOrder).toEqual(["close:start", "close:done", "runtime:dispose"]); }); + + it("propagates OTLP trace ids for ws transport requests when client tracing is enabled", async () => { + await configureClientTracing({ + exportIntervalMs: 10, + }); + + const transport = new WsTransport("ws://localhost:3020"); + const requestPromise = transport.request((client) => client[WS_METHODS.serverGetSettings]({})); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + + await waitFor(() => { + expect(socket.sent).toHaveLength(1); + }); + + const requestMessage = JSON.parse(socket.sent[0] ?? "{}") as { + id: string; + spanId?: string; + traceId?: string; + }; + expect(requestMessage.traceId).toMatch(/^[0-9a-f]{32}$/); + expect(requestMessage.spanId).toMatch(/^[0-9a-f]{16}$/); + + socket.serverMessage( + JSON.stringify({ + _tag: "Exit", + requestId: requestMessage.id, + exit: { + _tag: "Success", + value: DEFAULT_SERVER_SETTINGS, + }, + }), + ); + + await expect(requestPromise).resolves.toEqual(DEFAULT_SERVER_SETTINGS); + await transport.dispose(); + }); }); diff --git a/apps/web/src/wsTransport.ts b/apps/web/src/wsTransport.ts index 70042261d5..ee33c5134b 100644 --- a/apps/web/src/wsTransport.ts +++ b/apps/web/src/wsTransport.ts @@ -1,4 +1,4 @@ -import { Duration, Effect, Exit, ManagedRuntime, Option, Scope, Stream } from "effect"; +import { Duration, Effect, Exit, Layer, ManagedRuntime, Option, Scope, Stream } from "effect"; import { createWsRpcProtocolLayer, @@ -6,6 +6,7 @@ import { type WsRpcProtocolClient, } from "./rpc/protocol"; import { RpcClient } from "effect/unstable/rpc"; +import { ClientTracingLive, configureClientTracing } from "./observability/clientTracing"; interface SubscribeOptions { readonly retryDelay?: Duration.Input; @@ -28,10 +29,14 @@ export class WsTransport { private readonly runtime: ManagedRuntime.ManagedRuntime; private readonly clientScope: Scope.Closeable; private readonly clientPromise: Promise; + private readonly tracingReady: Promise; private disposed = false; constructor(url?: string) { - this.runtime = ManagedRuntime.make(createWsRpcProtocolLayer(url)); + this.tracingReady = configureClientTracing(); + this.runtime = ManagedRuntime.make( + Layer.mergeAll(createWsRpcProtocolLayer(url), ClientTracingLive), + ); this.clientScope = this.runtime.runSync(Scope.make()); this.clientPromise = this.runtime.runPromise( Scope.provide(this.clientScope)(makeWsRpcProtocolClient), @@ -46,6 +51,7 @@ export class WsTransport { throw new Error("Transport disposed"); } + await this.tracingReady; const client = await this.clientPromise; return await this.runtime.runPromise(Effect.suspend(() => execute(client))); } @@ -58,6 +64,7 @@ export class WsTransport { throw new Error("Transport disposed"); } + await this.tracingReady; const client = await this.clientPromise; await this.runtime.runPromise( Stream.runForEach(connect(client), (value) => @@ -84,7 +91,8 @@ export class WsTransport { let active = true; const retryDelayMs = options?.retryDelay ?? DEFAULT_SUBSCRIPTION_RETRY_DELAY_MS; const cancel = this.runtime.runCallback( - Effect.promise(() => this.clientPromise).pipe( + Effect.promise(() => this.tracingReady).pipe( + Effect.flatMap(() => Effect.promise(() => this.clientPromise)), Effect.flatMap((client) => Stream.runForEach(connect(client), (value) => Effect.sync(() => {