Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 44 additions & 57 deletions apps/server/src/persistence/Layers/SmeMessages.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,13 @@
import * as SqlClient from "effect/unstable/sql/SqlClient";
import * as SqlSchema from "effect/unstable/sql/SqlSchema";
import { Effect, Layer, Schema } from "effect";

import { toPersistenceDecodeError, toPersistenceSqlError } from "../Errors.ts";
import { Effect, Layer } from "effect";
import { toPersistenceSqlError } from "../Errors.ts";

import {
DeleteSmeMessagesByConversationInput,
ListSmeMessagesByConversationInput,
SmeMessageRepository,
SmeMessageRow,
type SmeMessageRepositoryShape,
type SmeMessageRow,
} from "../Services/SmeMessages.ts";

function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: string) {
return (cause: unknown) =>
Schema.isSchemaError(cause)
? toPersistenceDecodeError(decodeOperation)(cause)
: toPersistenceSqlError(sqlOperation)(cause);
}

/**
* DB row schema: isStreaming stored as INTEGER 0/1, mapped to/from boolean.
*/
const SmeMessageDbRow = Schema.Struct({
messageId: SmeMessageRow.fields.messageId,
conversationId: SmeMessageRow.fields.conversationId,
role: SmeMessageRow.fields.role,
text: SmeMessageRow.fields.text,
isStreaming: Schema.Number,
createdAt: SmeMessageRow.fields.createdAt,
updatedAt: SmeMessageRow.fields.updatedAt,
});

const makeSmeMessageRepository = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;

Expand All @@ -56,40 +32,51 @@ const makeSmeMessageRepository = Effect.gen(function* () {
}).pipe(Effect.mapError(toPersistenceSqlError("SmeMessageRepository.upsert:query")));

const listByConversationId: SmeMessageRepositoryShape["listByConversationId"] = (input) =>
Effect.gen(function* () {
const rows = yield* sql`
SELECT
message_id AS "messageId",
conversation_id AS "conversationId",
role,
text,
is_streaming AS "isStreaming",
created_at AS "createdAt",
updated_at AS "updatedAt"
FROM sme_messages
WHERE conversation_id = ${input.conversationId}
ORDER BY created_at ASC
`;
return rows.map((r: any) => ({
messageId: r.messageId,
conversationId: r.conversationId,
role: r.role,
text: r.text,
isStreaming: r.isStreaming !== 0,
createdAt: r.createdAt,
updatedAt: r.updatedAt,
})) as any;
}).pipe(
sql`
SELECT
message_id AS "messageId",
conversation_id AS "conversationId",
role,
text,
is_streaming AS "isStreaming",
created_at AS "createdAt",
updated_at AS "updatedAt"
FROM sme_messages
WHERE conversation_id = ${input.conversationId}
ORDER BY created_at ASC
`.pipe(
Effect.map((rows) =>
(
rows as ReadonlyArray<{
messageId: string;
conversationId: string;
role: string;
text: string;
isStreaming: number;
createdAt: string;
updatedAt: string;
}>
).map(
(r) =>
({
messageId: r.messageId as SmeMessageRow["messageId"],
conversationId: r.conversationId as SmeMessageRow["conversationId"],
role: r.role,
text: r.text,
isStreaming: r.isStreaming !== 0,
createdAt: r.createdAt,
updatedAt: r.updatedAt,
}) as SmeMessageRow,
),
),
Effect.mapError(toPersistenceSqlError("SmeMessageRepository.listByConversationId:query")),
);

const deleteByConversationId: SmeMessageRepositoryShape["deleteByConversationId"] = (input) =>
Effect.gen(function* () {
yield* sql`
DELETE FROM sme_messages
WHERE conversation_id = ${input.conversationId}
`;
}).pipe(
sql`
DELETE FROM sme_messages
WHERE conversation_id = ${input.conversationId}
`.pipe(
Effect.mapError(toPersistenceSqlError("SmeMessageRepository.deleteByConversationId:query")),
);

Expand Down
9 changes: 2 additions & 7 deletions apps/server/src/sme/Layers/SmeChatServiceLive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@
* @module SmeChatServiceLive
*/
import Anthropic from "@anthropic-ai/sdk";
import type {
SmeConversation,
SmeKnowledgeDocument,
SmeMessage,
SmeMessageEvent,
} from "@okcode/contracts";
import type { SmeConversation, SmeKnowledgeDocument, SmeMessage } from "@okcode/contracts";
import {
SME_MAX_DOCUMENT_SIZE_BYTES,
SME_MAX_DOCUMENTS_PER_PROJECT,
SME_MAX_CONVERSATIONS_PER_PROJECT,
} from "@okcode/contracts";
import { DateTime, Effect, Fiber, Layer, Option, Random, Ref } from "effect";
import { DateTime, Effect, Layer, Option, Random, Ref } from "effect";
import crypto from "node:crypto";

import { SmeKnowledgeDocumentRepository } from "../../persistence/Services/SmeKnowledgeDocuments.ts";
Expand Down
Loading
Loading