diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 61de04ad0e..0844cf8bb0 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -623,24 +623,28 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti )(function* (event, attachmentSideEffects) { switch (event.type) { case "thread.message-sent": { - const existingRows = yield* projectionThreadMessageRepository.listByThreadId({ - threadId: event.payload.threadId, + const existingMessage = yield* projectionThreadMessageRepository.getByMessageId({ + messageId: event.payload.messageId, + }); + const previousMessage = Option.getOrUndefined(existingMessage); + const nextText = Option.match(existingMessage, { + onNone: () => event.payload.text, + onSome: (message) => { + if (event.payload.streaming) { + return `${message.text}${event.payload.text}`; + } + if (event.payload.text.length === 0) { + return message.text; + } + return event.payload.text; + }, }); - const existingMessage = existingRows.find( - (row) => row.messageId === event.payload.messageId, - ); - const nextText = - existingMessage && event.payload.streaming - ? `${existingMessage.text}${event.payload.text}` - : existingMessage && event.payload.text.length === 0 - ? existingMessage.text - : event.payload.text; const nextAttachments = event.payload.attachments !== undefined ? yield* materializeAttachmentsForProjection({ attachments: event.payload.attachments, }) - : existingMessage?.attachments; + : previousMessage?.attachments; yield* projectionThreadMessageRepository.upsert({ messageId: event.payload.messageId, threadId: event.payload.threadId, @@ -649,7 +653,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti text: nextText, ...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}), isStreaming: event.payload.streaming, - createdAt: existingMessage?.createdAt ?? event.payload.createdAt, + createdAt: previousMessage?.createdAt ?? event.payload.createdAt, updatedAt: event.payload.updatedAt, }); return; diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts index b761387d47..5993ad6c20 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts @@ -55,6 +55,13 @@ layer("ProjectionThreadMessageRepository", (it) => { assert.equal(rows.length, 1); assert.equal(rows[0]?.text, "updated"); assert.deepEqual(rows[0]?.attachments, persistedAttachments); + + const rowById = yield* repository.getByMessageId({ messageId }); + assert.equal(rowById._tag, "Some"); + if (rowById._tag === "Some") { + assert.equal(rowById.value.text, "updated"); + assert.deepEqual(rowById.value.attachments, persistedAttachments); + } }), ); diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts index 6f0b25ddff..13b7086cec 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts @@ -1,10 +1,11 @@ import * as SqlClient from "effect/unstable/sql/SqlClient"; import * as SqlSchema from "effect/unstable/sql/SqlSchema"; -import { Effect, Layer, Schema, Struct } from "effect"; +import { Effect, Layer, Option, Schema, Struct } from "effect"; import { ChatAttachment } from "@t3tools/contracts"; import { toPersistenceSqlError } from "../Errors.ts"; import { + GetProjectionThreadMessageInput, ProjectionThreadMessageRepository, type ProjectionThreadMessageRepositoryShape, DeleteProjectionThreadMessagesInput, @@ -19,6 +20,22 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields( }), ); +function toProjectionThreadMessage( + row: Schema.Schema.Type, +): ProjectionThreadMessage { + return { + messageId: row.messageId, + threadId: row.threadId, + turnId: row.turnId, + role: row.role, + text: row.text, + isStreaming: row.isStreaming === 1, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + ...(row.attachments !== null ? { attachments: row.attachments } : {}), + }; +} + const makeProjectionThreadMessageRepository = Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; @@ -74,6 +91,27 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { }, }); + const getProjectionThreadMessageRow = SqlSchema.findOneOption({ + Request: GetProjectionThreadMessageInput, + Result: ProjectionThreadMessageDbRowSchema, + execute: ({ messageId }) => + sql` + SELECT + message_id AS "messageId", + thread_id AS "threadId", + turn_id AS "turnId", + role, + text, + attachments_json AS "attachments", + is_streaming AS "isStreaming", + created_at AS "createdAt", + updated_at AS "updatedAt" + FROM projection_thread_messages + WHERE message_id = ${messageId} + LIMIT 1 + `, + }); + const listProjectionThreadMessageRows = SqlSchema.findAll({ Request: ListProjectionThreadMessagesInput, Result: ProjectionThreadMessageDbRowSchema, @@ -109,24 +147,20 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { Effect.mapError(toPersistenceSqlError("ProjectionThreadMessageRepository.upsert:query")), ); + const getByMessageId: ProjectionThreadMessageRepositoryShape["getByMessageId"] = (input) => + getProjectionThreadMessageRow(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadMessageRepository.getByMessageId:query"), + ), + Effect.map(Option.map(toProjectionThreadMessage)), + ); + const listByThreadId: ProjectionThreadMessageRepositoryShape["listByThreadId"] = (input) => listProjectionThreadMessageRows(input).pipe( Effect.mapError( toPersistenceSqlError("ProjectionThreadMessageRepository.listByThreadId:query"), ), - Effect.map((rows) => - rows.map((row) => ({ - messageId: row.messageId, - threadId: row.threadId, - turnId: row.turnId, - role: row.role, - text: row.text, - isStreaming: row.isStreaming === 1, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - ...(row.attachments !== null ? { attachments: row.attachments } : {}), - })), - ), + Effect.map((rows) => rows.map(toProjectionThreadMessage)), ); const deleteByThreadId: ProjectionThreadMessageRepositoryShape["deleteByThreadId"] = (input) => @@ -138,6 +172,7 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { return { upsert, + getByMessageId, listByThreadId, deleteByThreadId, } satisfies ProjectionThreadMessageRepositoryShape; diff --git a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts index 00b1d399c6..b1a769cd91 100644 --- a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts @@ -8,13 +8,14 @@ */ import { ChatAttachment, - OrchestrationMessageRole, MessageId, + OrchestrationMessageRole, ThreadId, TurnId, IsoDateTime, } from "@t3tools/contracts"; import { Schema, ServiceMap } from "effect"; +import type { Option } from "effect"; import type { Effect } from "effect"; import type { ProjectionRepositoryError } from "../Errors.ts"; @@ -37,6 +38,11 @@ export const ListProjectionThreadMessagesInput = Schema.Struct({ }); export type ListProjectionThreadMessagesInput = typeof ListProjectionThreadMessagesInput.Type; +export const GetProjectionThreadMessageInput = Schema.Struct({ + messageId: MessageId, +}); +export type GetProjectionThreadMessageInput = typeof GetProjectionThreadMessageInput.Type; + export const DeleteProjectionThreadMessagesInput = Schema.Struct({ threadId: ThreadId, }); @@ -55,6 +61,13 @@ export interface ProjectionThreadMessageRepositoryShape { message: ProjectionThreadMessage, ) => Effect.Effect; + /** + * Read a projected thread message by id. + */ + readonly getByMessageId: ( + input: GetProjectionThreadMessageInput, + ) => Effect.Effect, ProjectionRepositoryError>; + /** * List projected thread messages for a thread. *