Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions apps/server/src/audit/Services/AuditLogService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* AuditLogService - Service interface and Live layer for structured audit logging.
*
* Records security-relevant and operational events with actor, category,
* and severity metadata. Supports paginated queries and live streaming.
*
* @module AuditLogService
*/
import type {
AuditEntry,
AuditQueryInput,
AuditQueryResult,
AuditStreamEvent,
} from "@t3tools/contracts";
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect";
import * as SqlClient from "effect/unstable/sql/SqlClient";

export interface AuditLogServiceShape {
/**
* Record a structured audit entry.
*
* Persists to the audit_log table and publishes to the live event stream.
*/
readonly record: (entry: {
readonly actor: AuditEntry["actor"];
readonly actorId: string | null;
readonly category: AuditEntry["category"];
readonly action: string;
readonly severity: AuditEntry["severity"];
readonly projectId: string | null;
readonly threadId: string | null;
readonly commandId: string | null;
readonly eventId: string | null;
readonly summary: string;
readonly detail: string | null;
readonly metadata: Record<string, unknown>;
}) => Effect.Effect<AuditEntry>;

/**
* Query audit entries with filters and pagination.
*/
readonly query: (input: AuditQueryInput) => Effect.Effect<AuditQueryResult>;

/**
* Live stream of new audit entries.
*
* Each access creates a fresh PubSub subscription so multiple consumers
* independently receive all events.
*/
readonly streamEvents: Stream.Stream<AuditStreamEvent>;
}

export class AuditLogService extends ServiceMap.Service<AuditLogService, AuditLogServiceShape>()(
"t3/audit/Services/AuditLogService",
) {}

const makeAuditLogService = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;
const pubsub = yield* PubSub.unbounded<AuditStreamEvent>();

const record: AuditLogServiceShape["record"] = (input) =>
Effect.gen(function* () {
const id = crypto.randomUUID();
const now = new Date().toISOString();
const metadataJson = JSON.stringify(input.metadata);

yield* sql`INSERT INTO audit_log (id, timestamp, actor, actor_id, category, action, severity, project_id, thread_id, command_id, event_id, summary, detail, metadata)
VALUES (${id}, ${now}, ${input.actor}, ${input.actorId}, ${input.category}, ${input.action}, ${input.severity}, ${input.projectId}, ${input.threadId}, ${input.commandId}, ${input.eventId}, ${input.summary}, ${input.detail}, ${metadataJson})`;

const entry: AuditEntry = {
id: id as AuditEntry["id"],
timestamp: now,
actor: input.actor,
actorId: (input.actorId ?? null) as AuditEntry["actorId"],
category: input.category,
action: input.action as AuditEntry["action"],
severity: input.severity,
projectId: (input.projectId ?? null) as AuditEntry["projectId"],
threadId: (input.threadId ?? null) as AuditEntry["threadId"],
commandId: (input.commandId ?? null) as AuditEntry["commandId"],
eventId: (input.eventId ?? null) as AuditEntry["eventId"],
summary: input.summary as AuditEntry["summary"],
detail: (input.detail ?? null) as AuditEntry["detail"],
metadata: input.metadata,
};

yield* PubSub.publish(pubsub, { type: "audit.entry" as const, entry });
return entry;
}).pipe(Effect.orDie);

const query: AuditLogServiceShape["query"] = (input) =>
Effect.gen(function* () {
const conditions: Array<string> = [];
const params: Array<string | number> = [];

if (input.projectId) {
conditions.push("project_id = ?");
params.push(input.projectId);
}
if (input.threadId) {
conditions.push("thread_id = ?");
params.push(input.threadId);
}
if (input.category) {
conditions.push("category = ?");
params.push(input.category);
}
if (input.severity) {
conditions.push("severity = ?");
params.push(input.severity);
}
if (input.actor) {
conditions.push("actor = ?");
params.push(input.actor);
}
if (input.fromTimestamp) {
conditions.push("timestamp >= ?");
params.push(input.fromTimestamp);
}
if (input.toTimestamp) {
conditions.push("timestamp <= ?");
params.push(input.toTimestamp);
}

const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : "";
const limit = Math.max(1, input.limit);
const offset = Math.max(0, input.offset);

const countResult = yield* sql.unsafe<{ total: number }>(
`SELECT COUNT(*) as total FROM audit_log ${whereClause}`,
params,
);
const total = Number(countResult[0]?.total ?? 0);

const rows = yield* sql.unsafe<Record<string, unknown>>(
`SELECT id, timestamp, actor, actor_id, category, action, severity, project_id, thread_id, command_id, event_id, summary, detail, metadata
FROM audit_log ${whereClause}
ORDER BY timestamp DESC
LIMIT ? OFFSET ?`,
[...params, limit, offset],
);

const entries: AuditEntry[] = rows.map((r) => ({
id: r["id"] as AuditEntry["id"],
timestamp: r["timestamp"] as string,
actor: r["actor"] as AuditEntry["actor"],
actorId: (r["actor_id"] ?? null) as AuditEntry["actorId"],
category: r["category"] as AuditEntry["category"],
action: r["action"] as AuditEntry["action"],
severity: r["severity"] as AuditEntry["severity"],
projectId: (r["project_id"] ?? null) as AuditEntry["projectId"],
threadId: (r["thread_id"] ?? null) as AuditEntry["threadId"],
commandId: (r["command_id"] ?? null) as AuditEntry["commandId"],
eventId: (r["event_id"] ?? null) as AuditEntry["eventId"],
summary: r["summary"] as AuditEntry["summary"],
detail: (r["detail"] ?? null) as AuditEntry["detail"],
metadata:
typeof r["metadata"] === "string" ? JSON.parse(r["metadata"]) : (r["metadata"] ?? {}),
}));

return {
entries,
total: total as AuditQueryResult["total"],
hasMore: offset + limit < total,
} satisfies AuditQueryResult;
}).pipe(Effect.orDie);

return {
record,
query,
get streamEvents(): AuditLogServiceShape["streamEvents"] {
return Stream.fromPubSub(pubsub);
},
} satisfies AuditLogServiceShape;
});

export const AuditLogServiceLive = Layer.effect(AuditLogService, makeAuditLogService);
168 changes: 168 additions & 0 deletions apps/server/src/ci/Services/CIIntegrationService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/**
* CIIntegrationService - Service interface for CI pipeline integration.
*
* Tracks CI runs, manages feedback policies for automated responses to
* failures, and exposes a live event stream for CI status updates.
*
* @module CIIntegrationService
*/
import type {
CIFeedbackPolicy,
CIGetStatusInput,
CIGetStatusResult,
CIRun,
CIRunId,
CISetFeedbackPolicyInput,
CIStreamEvent,
CITriggerRerunInput,
} from "@t3tools/contracts";
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect";
import * as SqlClient from "effect/unstable/sql/SqlClient";

export interface CIIntegrationServiceShape {
/**
* Query CI run status with project/thread/branch filters.
*/
readonly getStatus: (input: CIGetStatusInput) => Effect.Effect<CIGetStatusResult>;

/**
* Record or update a CI run entry.
*/
readonly recordRun: (run: CIRun) => Effect.Effect<CIRun>;

/**
* Trigger a re-run of a CI pipeline. Records an audit-style activity
* event and returns the run being re-triggered.
*/
readonly triggerRerun: (input: CITriggerRerunInput) => Effect.Effect<CIRun>;

/**
* Create or update the feedback policy for a project.
*/
readonly setFeedbackPolicy: (input: CISetFeedbackPolicyInput) => Effect.Effect<CIFeedbackPolicy>;

/**
* Retrieve the feedback policy for a project, if one exists.
*/
readonly getFeedbackPolicy: (projectId: string) => Effect.Effect<CIFeedbackPolicy | null>;

/**
* Live stream of CI events (run updates, feedback triggers).
*
* Each access creates a fresh PubSub subscription so multiple consumers
* independently receive all events.
*/
readonly streamEvents: Stream.Stream<CIStreamEvent>;
}

export class CIIntegrationService extends ServiceMap.Service<
CIIntegrationService,
CIIntegrationServiceShape
>()("t3/ci/Services/CIIntegrationService") {}

const makeCIIntegrationService = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;
const pubsub = yield* PubSub.unbounded<CIStreamEvent>();

const runFromRow = (r: Record<string, unknown>): CIRun => ({
id: r["id"] as CIRunId,
projectId: r["project_id"] as CIRun["projectId"],
threadId: (r["thread_id"] ?? null) as CIRun["threadId"],
turnId: (r["turn_id"] ?? null) as CIRun["turnId"],
provider: r["provider"] as CIRun["provider"],
workflowName: r["workflow_name"] as CIRun["workflowName"],
branch: r["branch"] as CIRun["branch"],
commitSha: r["commit_sha"] as CIRun["commitSha"],
status: r["status"] as CIRun["status"],
conclusion: (r["conclusion"] ?? null) as CIRun["conclusion"],
jobs: JSON.parse(r["jobs"] as string),
htmlUrl: (r["html_url"] ?? null) as CIRun["htmlUrl"],
startedAt: r["started_at"] as string,
completedAt: (r["completed_at"] ?? null) as CIRun["completedAt"],
updatedAt: r["updated_at"] as string,
});

const getStatus: CIIntegrationServiceShape["getStatus"] = (input) =>
Effect.gen(function* () {
const rows = yield* sql.unsafe<Record<string, unknown>>(
`SELECT * FROM ci_runs WHERE project_id = ?${input.threadId ? " AND thread_id = ?" : ""}${input.branch ? " AND branch = ?" : ""} ORDER BY started_at DESC LIMIT ?`,
[
input.projectId,
...(input.threadId ? [input.threadId] : []),
...(input.branch ? [input.branch] : []),
input.limit,
],
);
return { runs: rows.map(runFromRow), hasMore: rows.length === input.limit } as const;
}).pipe(Effect.orDie);

const recordRun: CIIntegrationServiceShape["recordRun"] = (run) =>
Effect.gen(function* () {
yield* sql`INSERT OR REPLACE INTO ci_runs (id, project_id, thread_id, turn_id, provider, workflow_name, branch, commit_sha, status, conclusion, jobs, html_url, started_at, completed_at, updated_at)
VALUES (${run.id}, ${run.projectId}, ${run.threadId}, ${run.turnId}, ${run.provider}, ${run.workflowName}, ${run.branch}, ${run.commitSha}, ${run.status}, ${run.conclusion}, ${JSON.stringify(run.jobs)}, ${run.htmlUrl}, ${run.startedAt}, ${run.completedAt}, ${run.updatedAt})`;
yield* PubSub.publish(pubsub, { type: "ci.run.updated" as const, run });
return run;
}).pipe(Effect.orDie);

const triggerRerun: CIIntegrationServiceShape["triggerRerun"] = (input) =>
Effect.gen(function* () {
const rows = yield* sql<
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: triggerRerun ignores projectId and failedOnly, so any run ID can be re-queued even if it belongs to another project or isn’t failed. Filter by project and enforce the failedOnly guard before re-queuing.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/server/src/ci/Services/CIIntegrationService.ts, line 109:

<comment>`triggerRerun` ignores `projectId` and `failedOnly`, so any run ID can be re-queued even if it belongs to another project or isn’t failed. Filter by project and enforce the `failedOnly` guard before re-queuing.</comment>

<file context>
@@ -0,0 +1,168 @@
+
+  const triggerRerun: CIIntegrationServiceShape["triggerRerun"] = (input) =>
+    Effect.gen(function* () {
+      const rows = yield* sql<
+        Record<string, unknown>
+      >`SELECT * FROM ci_runs WHERE id = ${input.runId}`;
</file context>
Fix with Cubic

Record<string, unknown>
>`SELECT * FROM ci_runs WHERE id = ${input.runId}`;
const run = rows[0];
if (!run) return yield* Effect.fail(new Error(`CI run ${input.runId} not found`));
const now = new Date().toISOString();
const requeued = {
...runFromRow(run),
status: "queued" as const,
conclusion: null,
updatedAt: now,
};
yield* recordRun(requeued);
return requeued;
}).pipe(Effect.orDie);

const setFeedbackPolicy: CIIntegrationServiceShape["setFeedbackPolicy"] = (input) =>
Effect.gen(function* () {
yield* sql`INSERT OR REPLACE INTO ci_feedback_policies (project_id, on_failure, auto_fix_max_attempts, watch_branches)
VALUES (${input.projectId}, ${input.onFailure}, ${input.autoFixMaxAttempts}, ${JSON.stringify(input.watchBranches)})`;
return {
projectId: input.projectId,
onFailure: input.onFailure,
autoFixMaxAttempts: input.autoFixMaxAttempts,
watchBranches: input.watchBranches,
} satisfies CIFeedbackPolicy;
}).pipe(Effect.orDie);

const getFeedbackPolicy: CIIntegrationServiceShape["getFeedbackPolicy"] = (projectId) =>
Effect.gen(function* () {
const rows = yield* sql<{
project_id: string;
on_failure: string;
auto_fix_max_attempts: number;
watch_branches: string;
}>`SELECT * FROM ci_feedback_policies WHERE project_id = ${projectId}`;
const row = rows[0];
if (!row) return null;
return {
projectId: row.project_id as CIFeedbackPolicy["projectId"],
onFailure: row.on_failure as CIFeedbackPolicy["onFailure"],
autoFixMaxAttempts: row.auto_fix_max_attempts,
watchBranches: JSON.parse(row.watch_branches),
} satisfies CIFeedbackPolicy;
}).pipe(Effect.orDie);

return {
getStatus,
recordRun,
triggerRerun,
setFeedbackPolicy,
getFeedbackPolicy,
streamEvents: Stream.fromPubSub(pubsub),
};
});

export const CIIntegrationServiceLive = Layer.effect(
CIIntegrationService,
makeCIIntegrationService,
);
Loading