Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -623,24 +623,28 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
)(function* (event, attachmentSideEffects) {
switch (event.type) {
case "thread.message-sent": {
const existingRows = yield* projectionThreadMessageRepository.listByThreadId({
threadId: event.payload.threadId,
const existingMessage = yield* projectionThreadMessageRepository.getByMessageId({
messageId: event.payload.messageId,
});
const previousMessage = Option.getOrUndefined(existingMessage);
const nextText = Option.match(existingMessage, {
onNone: () => event.payload.text,
onSome: (message) => {
if (event.payload.streaming) {
return `${message.text}${event.payload.text}`;
}
if (event.payload.text.length === 0) {
return message.text;
}
return event.payload.text;
},
});
const existingMessage = existingRows.find(
(row) => row.messageId === event.payload.messageId,
);
const nextText =
existingMessage && event.payload.streaming
? `${existingMessage.text}${event.payload.text}`
: existingMessage && event.payload.text.length === 0
? existingMessage.text
: event.payload.text;
const nextAttachments =
event.payload.attachments !== undefined
? yield* materializeAttachmentsForProjection({
attachments: event.payload.attachments,
})
: existingMessage?.attachments;
: previousMessage?.attachments;
yield* projectionThreadMessageRepository.upsert({
messageId: event.payload.messageId,
threadId: event.payload.threadId,
Expand All @@ -649,7 +653,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
text: nextText,
...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}),
isStreaming: event.payload.streaming,
createdAt: existingMessage?.createdAt ?? event.payload.createdAt,
createdAt: previousMessage?.createdAt ?? event.payload.createdAt,
updatedAt: event.payload.updatedAt,
});
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ layer("ProjectionThreadMessageRepository", (it) => {
assert.equal(rows.length, 1);
assert.equal(rows[0]?.text, "updated");
assert.deepEqual(rows[0]?.attachments, persistedAttachments);

const rowById = yield* repository.getByMessageId({ messageId });
assert.equal(rowById._tag, "Some");
if (rowById._tag === "Some") {
assert.equal(rowById.value.text, "updated");
assert.deepEqual(rowById.value.attachments, persistedAttachments);
}
}),
);

Expand Down
63 changes: 49 additions & 14 deletions apps/server/src/persistence/Layers/ProjectionThreadMessages.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import * as SqlClient from "effect/unstable/sql/SqlClient";
import * as SqlSchema from "effect/unstable/sql/SqlSchema";
import { Effect, Layer, Schema, Struct } from "effect";
import { Effect, Layer, Option, Schema, Struct } from "effect";
import { ChatAttachment } from "@t3tools/contracts";

import { toPersistenceSqlError } from "../Errors.ts";
import {
GetProjectionThreadMessageInput,
ProjectionThreadMessageRepository,
type ProjectionThreadMessageRepositoryShape,
DeleteProjectionThreadMessagesInput,
Expand All @@ -19,6 +20,22 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields(
}),
);

function toProjectionThreadMessage(
row: Schema.Schema.Type<typeof ProjectionThreadMessageDbRowSchema>,
): ProjectionThreadMessage {
return {
messageId: row.messageId,
threadId: row.threadId,
turnId: row.turnId,
role: row.role,
text: row.text,
isStreaming: row.isStreaming === 1,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
...(row.attachments !== null ? { attachments: row.attachments } : {}),
};
}

const makeProjectionThreadMessageRepository = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;

Expand Down Expand Up @@ -74,6 +91,27 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
},
});

const getProjectionThreadMessageRow = SqlSchema.findOneOption({
Request: GetProjectionThreadMessageInput,
Result: ProjectionThreadMessageDbRowSchema,
execute: ({ messageId }) =>
sql`
SELECT
message_id AS "messageId",
thread_id AS "threadId",
turn_id AS "turnId",
role,
text,
attachments_json AS "attachments",
is_streaming AS "isStreaming",
created_at AS "createdAt",
updated_at AS "updatedAt"
FROM projection_thread_messages
WHERE message_id = ${messageId}
LIMIT 1
`,
});

const listProjectionThreadMessageRows = SqlSchema.findAll({
Request: ListProjectionThreadMessagesInput,
Result: ProjectionThreadMessageDbRowSchema,
Expand Down Expand Up @@ -109,24 +147,20 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
Effect.mapError(toPersistenceSqlError("ProjectionThreadMessageRepository.upsert:query")),
);

const getByMessageId: ProjectionThreadMessageRepositoryShape["getByMessageId"] = (input) =>
getProjectionThreadMessageRow(input).pipe(
Effect.mapError(
toPersistenceSqlError("ProjectionThreadMessageRepository.getByMessageId:query"),
),
Effect.map(Option.map(toProjectionThreadMessage)),
);

const listByThreadId: ProjectionThreadMessageRepositoryShape["listByThreadId"] = (input) =>
listProjectionThreadMessageRows(input).pipe(
Effect.mapError(
toPersistenceSqlError("ProjectionThreadMessageRepository.listByThreadId:query"),
),
Effect.map((rows) =>
rows.map((row) => ({
messageId: row.messageId,
threadId: row.threadId,
turnId: row.turnId,
role: row.role,
text: row.text,
isStreaming: row.isStreaming === 1,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
...(row.attachments !== null ? { attachments: row.attachments } : {}),
})),
),
Effect.map((rows) => rows.map(toProjectionThreadMessage)),
);

const deleteByThreadId: ProjectionThreadMessageRepositoryShape["deleteByThreadId"] = (input) =>
Expand All @@ -138,6 +172,7 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {

return {
upsert,
getByMessageId,
listByThreadId,
deleteByThreadId,
} satisfies ProjectionThreadMessageRepositoryShape;
Expand Down
15 changes: 14 additions & 1 deletion apps/server/src/persistence/Services/ProjectionThreadMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
*/
import {
ChatAttachment,
OrchestrationMessageRole,
MessageId,
OrchestrationMessageRole,
ThreadId,
TurnId,
IsoDateTime,
} from "@t3tools/contracts";
import { Schema, ServiceMap } from "effect";
import type { Option } from "effect";
import type { Effect } from "effect";

import type { ProjectionRepositoryError } from "../Errors.ts";
Expand All @@ -37,6 +38,11 @@ export const ListProjectionThreadMessagesInput = Schema.Struct({
});
export type ListProjectionThreadMessagesInput = typeof ListProjectionThreadMessagesInput.Type;

export const GetProjectionThreadMessageInput = Schema.Struct({
messageId: MessageId,
});
export type GetProjectionThreadMessageInput = typeof GetProjectionThreadMessageInput.Type;

export const DeleteProjectionThreadMessagesInput = Schema.Struct({
threadId: ThreadId,
});
Expand All @@ -55,6 +61,13 @@ export interface ProjectionThreadMessageRepositoryShape {
message: ProjectionThreadMessage,
) => Effect.Effect<void, ProjectionRepositoryError>;

/**
* Read a projected thread message by id.
*/
readonly getByMessageId: (
input: GetProjectionThreadMessageInput,
) => Effect.Effect<Option.Option<ProjectionThreadMessage>, ProjectionRepositoryError>;

/**
* List projected thread messages for a thread.
*
Expand Down
Loading