Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 124 additions & 131 deletions apps/server/src/persistence/NodeSqliteClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,159 +72,152 @@ const checkNodeSqliteCompat = () => {
return Effect.void;
};

const makeWithDatabase = (
const makeWithDatabase = Effect.fn("makeWithDatabase")(function* (
options: SqliteClientConfig,
openDatabase: () => DatabaseSync,
): Effect.Effect<Client.SqlClient, never, Scope.Scope | Reactivity.Reactivity> =>
Effect.gen(function* () {
yield* checkNodeSqliteCompat();
): Effect.fn.Return<Client.SqlClient, never, Scope.Scope | Reactivity.Reactivity> {
yield* checkNodeSqliteCompat();

const compiler = Statement.makeCompilerSqlite(options.transformQueryNames);
const transformRows = options.transformResultNames
? Statement.defaultTransforms(options.transformResultNames).array
: undefined;
const compiler = Statement.makeCompilerSqlite(options.transformQueryNames);
const transformRows = options.transformResultNames
? Statement.defaultTransforms(options.transformResultNames).array
: undefined;

const makeConnection = Effect.gen(function* () {
const scope = yield* Effect.scope;
const db = openDatabase();
yield* Scope.addFinalizer(
scope,
Effect.sync(() => db.close()),
);
const makeConnection = Effect.gen(function* () {
const scope = yield* Effect.scope;
const db = openDatabase();
yield* Scope.addFinalizer(
scope,
Effect.sync(() => db.close()),
);

const statementReaderCache = new WeakMap<StatementSync, boolean>();
const hasRows = (statement: StatementSync): boolean => {
const cached = statementReaderCache.get(statement);
if (cached !== undefined) {
return cached;
const statementReaderCache = new WeakMap<StatementSync, boolean>();
const hasRows = (statement: StatementSync): boolean => {
const cached = statementReaderCache.get(statement);
if (cached !== undefined) {
return cached;
}
const value = statement.columns().length > 0;
statementReaderCache.set(statement, value);
return value;
};

const prepareCache = yield* Cache.make({
capacity: options.prepareCacheSize ?? 200,
timeToLive: options.prepareCacheTTL ?? Duration.minutes(10),
lookup: (sql: string) =>
Effect.try({
try: () => db.prepare(sql),
catch: (cause) =>
new SqlError({
reason: classifySqliteError(cause, {
message: "Failed to prepare statement",
operation: "prepare",
}),
}),
}),
});

const runStatement = (statement: StatementSync, params: ReadonlyArray<unknown>, raw: boolean) =>
Effect.withFiber<ReadonlyArray<any>, SqlError>((fiber) => {
statement.setReadBigInts(Boolean(ServiceMap.get(fiber.services, Client.SafeIntegers)));
try {
if (hasRows(statement)) {
return Effect.succeed(statement.all(...(params as any)));
}
const result = statement.run(...(params as any));
return Effect.succeed(raw ? (result as unknown as ReadonlyArray<any>) : []);
} catch (cause) {
return Effect.fail(
new SqlError({
reason: classifySqliteError(cause, {
message: "Failed to execute statement",
operation: "execute",
}),
}),
);
}
const value = statement.columns().length > 0;
statementReaderCache.set(statement, value);
return value;
};
});

const run = (sql: string, params: ReadonlyArray<unknown>, raw = false) =>
Effect.flatMap(Cache.get(prepareCache, sql), (s) => runStatement(s, params, raw));

const prepareCache = yield* Cache.make({
capacity: options.prepareCacheSize ?? 200,
timeToLive: options.prepareCacheTTL ?? Duration.minutes(10),
lookup: (sql: string) =>
const runValues = (sql: string, params: ReadonlyArray<unknown>) =>
Effect.acquireUseRelease(
Cache.get(prepareCache, sql),
(statement) =>
Effect.try({
try: () => db.prepare(sql),
try: () => {
if (hasRows(statement)) {
statement.setReturnArrays(true);
// Safe to cast to array after we've setReturnArrays(true)
return statement.all(...(params as any)) as unknown as ReadonlyArray<
ReadonlyArray<unknown>
>;
}
statement.run(...(params as any));
return [];
},
catch: (cause) =>
new SqlError({
reason: classifySqliteError(cause, {
message: "Failed to prepare statement",
operation: "prepare",
message: "Failed to execute statement",
operation: "execute",
}),
}),
}),
});

const runStatement = (
statement: StatementSync,
params: ReadonlyArray<unknown>,
raw: boolean,
) =>
Effect.withFiber<ReadonlyArray<any>, SqlError>((fiber) => {
statement.setReadBigInts(Boolean(ServiceMap.get(fiber.services, Client.SafeIntegers)));
try {
(statement) =>
Effect.sync(() => {
if (hasRows(statement)) {
return Effect.succeed(statement.all(...(params as any)));
statement.setReturnArrays(false);
}
const result = statement.run(...(params as any));
return Effect.succeed(raw ? (result as unknown as ReadonlyArray<any>) : []);
} catch (cause) {
return Effect.fail(
new SqlError({
reason: classifySqliteError(cause, {
message: "Failed to execute statement",
operation: "execute",
}),
}),
);
}
});

const run = (sql: string, params: ReadonlyArray<unknown>, raw = false) =>
Effect.flatMap(Cache.get(prepareCache, sql), (s) => runStatement(s, params, raw));

const runValues = (sql: string, params: ReadonlyArray<unknown>) =>
Effect.acquireUseRelease(
Cache.get(prepareCache, sql),
(statement) =>
Effect.try({
try: () => {
if (hasRows(statement)) {
statement.setReturnArrays(true);
// Safe to cast to array after we've setReturnArrays(true)
return statement.all(...(params as any)) as unknown as ReadonlyArray<
ReadonlyArray<unknown>
>;
}
statement.run(...(params as any));
return [];
},
catch: (cause) =>
new SqlError({
reason: classifySqliteError(cause, {
message: "Failed to execute statement",
operation: "execute",
}),
}),
}),
(statement) =>
Effect.sync(() => {
if (hasRows(statement)) {
statement.setReturnArrays(false);
}
}),
);
}),
);

return identity<Connection>({
execute(sql, params, rowTransform) {
return rowTransform ? Effect.map(run(sql, params), rowTransform) : run(sql, params);
},
executeRaw(sql, params) {
return run(sql, params, true);
},
executeValues(sql, params) {
return runValues(sql, params);
},
executeUnprepared(sql, params, rowTransform) {
const effect = runStatement(db.prepare(sql), params ?? [], false);
return rowTransform ? Effect.map(effect, rowTransform) : effect;
},
executeStream(_sql, _params) {
return Stream.die("executeStream not implemented");
},
});
return identity<Connection>({
execute(sql, params, rowTransform) {
return rowTransform ? Effect.map(run(sql, params), rowTransform) : run(sql, params);
},
executeRaw(sql, params) {
return run(sql, params, true);
},
executeValues(sql, params) {
return runValues(sql, params);
},
executeUnprepared(sql, params, rowTransform) {
const effect = runStatement(db.prepare(sql), params ?? [], false);
return rowTransform ? Effect.map(effect, rowTransform) : effect;
},
executeStream(_sql, _params) {
return Stream.die("executeStream not implemented");
},
});
});

const semaphore = yield* Semaphore.make(1);
const connection = yield* makeConnection;
const semaphore = yield* Semaphore.make(1);
const connection = yield* makeConnection;

const acquirer = semaphore.withPermits(1)(Effect.succeed(connection));
const transactionAcquirer = Effect.uninterruptibleMask((restore) => {
const fiber = Fiber.getCurrent()!;
const scope = ServiceMap.getUnsafe(fiber.services, Scope.Scope);
return Effect.as(
Effect.tap(restore(semaphore.take(1)), () =>
Scope.addFinalizer(scope, semaphore.release(1)),
),
connection,
);
});
const acquirer = semaphore.withPermits(1)(Effect.succeed(connection));
const transactionAcquirer = Effect.uninterruptibleMask((restore) => {
const fiber = Fiber.getCurrent()!;
const scope = ServiceMap.getUnsafe(fiber.services, Scope.Scope);
return Effect.as(
Effect.tap(restore(semaphore.take(1)), () => Scope.addFinalizer(scope, semaphore.release(1))),
connection,
);
});

return yield* Client.make({
acquirer,
compiler,
transactionAcquirer,
spanAttributes: [
...(options.spanAttributes ? Object.entries(options.spanAttributes) : []),
[ATTR_DB_SYSTEM_NAME, "sqlite"],
],
transformRows,
});
return yield* Client.make({
acquirer,
compiler,
transactionAcquirer,
spanAttributes: [
...(options.spanAttributes ? Object.entries(options.spanAttributes) : []),
[ATTR_DB_SYSTEM_NAME, "sqlite"],
],
transformRows,
});
});

const make = (
options: SqliteClientConfig,
Expand Down
Loading