Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 132 additions & 9 deletions src/cashet/store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import contextlib
import hashlib
import json
import logging
Expand Down Expand Up @@ -708,12 +709,12 @@ def delete_by_tags(self, tags: dict[str, str | None]) -> int:
return deleted

def _row_object_refs(self, row: sqlite3.Row) -> list[str]:
refs: list[str] = []
refs: set[str] = set()
if row["output_hash"]:
refs.append(row["output_hash"])
refs.add(row["output_hash"])
if row["input_refs"]:
refs.extend(set(json.loads(row["input_refs"])))
return refs
refs.update(json.loads(row["input_refs"]))
return list(refs)

def _object_ref_counts(self, conn: sqlite3.Connection) -> dict[str, int]:
counts: dict[str, int] = {}
Expand Down Expand Up @@ -844,16 +845,119 @@ async def __aexit__(self, *args: Any) -> None:
self._state.thread_lock.release()


class _SyncSQLiteFingerprintLock:
def __init__(self, lock_path: str) -> None:
self._state = _sqlite_lock_state(lock_path)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 If file_lock.acquire() fails in __enter__, the already-acquired thread_lock is never released, leading to a permanent lock. Wrap the acquisition in a try/except:

Suggested change
def __enter__(self) -> _SyncSQLiteFingerprintLock:
self._state.thread_lock.acquire()
try:
self._state.file_lock.acquire()
except Exception:
self._state.thread_lock.release()
raise
return self

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 If file_lock.acquire() fails in __enter__, the already-acquired thread_lock is never released, leading to a permanent lock. Wrap the acquisition in a try/except:

Suggested change
def __enter__(self) -> _SyncSQLiteFingerprintLock:
self._state.thread_lock.acquire()
try:
self._state.file_lock.acquire()
except Exception:
self._state.thread_lock.release()
raise
return self

def __enter__(self) -> _SyncSQLiteFingerprintLock:
self._state.thread_lock.acquire()
self._state.file_lock.acquire()
return self

def __exit__(self, *args: Any) -> None:
self._state.file_lock.release()
self._state.thread_lock.release()


class _DirectFingerprintLock:
def __init__(self, lock_path: str) -> None:
self._state = _sqlite_lock_state(lock_path)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Both __aenter__ and __aexit__ call thread_lock.acquire() and file_lock.acquire() synchronously, blocking the event loop. Additionally, if file_lock.acquire() raises in __aenter__, thread_lock is never released, causing a deadlock. Use asyncio.to_thread() and wrap with try/except:

Suggested change
self._state = _sqlite_lock_state(lock_path)
async def __aenter__(self) -> _DirectFingerprintLock:
await asyncio.to_thread(self._state.thread_lock.acquire)
try:
await asyncio.to_thread(self._state.file_lock.acquire)
except Exception:
self._state.thread_lock.release()
raise
return self
async def __aexit__(self, *args: Any) -> None:
try:
await asyncio.to_thread(self._state.file_lock.release)
finally:
self._state.thread_lock.release()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Both __aenter__ and __aexit__ call thread_lock.acquire() and file_lock.acquire() synchronously, blocking the event loop. Additionally, if file_lock.acquire() raises in __aenter__, thread_lock is never released, causing a deadlock. Use asyncio.to_thread() and wrap with try/except:

Suggested change
self._state = _sqlite_lock_state(lock_path)
async def __aenter__(self) -> _DirectFingerprintLock:
await asyncio.to_thread(self._state.thread_lock.acquire)
try:
await asyncio.to_thread(self._state.file_lock.acquire)
except Exception:
self._state.thread_lock.release()
raise
return self
async def __aexit__(self, *args: Any) -> None:
try:
await asyncio.to_thread(self._state.file_lock.release)
finally:
self._state.thread_lock.release()


async def __aenter__(self) -> _DirectFingerprintLock:
self._state.thread_lock.acquire()
self._state.file_lock.acquire()
return self

async def __aexit__(self, *args: Any) -> None:
self._state.file_lock.release()
self._state.thread_lock.release()


class _DirectAsyncSQLiteStore:
def __init__(self, core: _SQLiteStoreCore) -> None:
self._core = core
self._lock_paths: set[str] = set()

def _fingerprint_lock(self, fingerprint: str) -> _DirectFingerprintLock:
import hashlib

fp_hash = hashlib.sha256(fingerprint.encode()).hexdigest()[:16]
lock_path = str(self._core.root / f".lock-{fp_hash}")
self._lock_paths.add(lock_path)
return _DirectFingerprintLock(lock_path)

@property
def root(self) -> Path:
return self._core.root

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 All async methods in _DirectAsyncSQLiteStore call synchronous _core methods directly (e.g., self._core.put_blob(data)) without asyncio.to_thread(), blocking the event loop. Wrap every call with await asyncio.to_thread(...):

Suggested change
async def put_blob(self, data: bytes) -> ObjectRef:
return await asyncio.to_thread(self._core.put_blob, data)
# Repeat for all other methods (get_blob, put_commit, etc.)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _DirectAsyncSQLiteStore performs write operations (put_blob, put_commit, etc.) without any mutual exclusion. Concurrent coroutines may interleave writes to SQLite, leading to data corruption. Add an asyncio.Lock similar to AsyncSQLiteStore:

Suggested change
class _DirectAsyncSQLiteStore:
def __init__(self, core: _SQLiteStoreCore) -> None:
self._core = core
self._lock_paths: set[str] = set()
self._write_lock = asyncio.Lock()
async def put_blob(self, data: bytes) -> ObjectRef:
async with self._write_lock:
return await asyncio.to_thread(self._core.put_blob, data)
# Similarly for put_commit and other write methods

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 All async methods in _DirectAsyncSQLiteStore call synchronous _core methods directly (e.g., self._core.put_blob(data)) without asyncio.to_thread(), blocking the event loop. Wrap every call with await asyncio.to_thread(...):

Suggested change
async def put_blob(self, data: bytes) -> ObjectRef:
return await asyncio.to_thread(self._core.put_blob, data)
# Repeat for all other methods (get_blob, put_commit, etc.)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _DirectAsyncSQLiteStore performs write operations (put_blob, put_commit, etc.) without any mutual exclusion. Concurrent coroutines may interleave writes to SQLite, leading to data corruption. Add an asyncio.Lock similar to AsyncSQLiteStore:

Suggested change
class _DirectAsyncSQLiteStore:
def __init__(self, core: _SQLiteStoreCore) -> None:
self._core = core
self._lock_paths: set[str] = set()
self._write_lock = asyncio.Lock()
async def put_blob(self, data: bytes) -> ObjectRef:
async with self._write_lock:
return await asyncio.to_thread(self._core.put_blob, data)
# Similarly for put_commit and other write methods

async def put_blob(self, data: bytes) -> ObjectRef:
return self._core.put_blob(data)

async def get_blob(self, ref: ObjectRef) -> bytes:
return self._core.get_blob(ref)

async def put_commit(self, commit: Commit) -> None:
self._core.put_commit(commit)

async def get_commit(self, hash: str) -> Commit | None:
return self._core.get_commit(hash)

async def find_by_fingerprint(self, fingerprint: str) -> Commit | None:
return self._core.find_by_fingerprint(fingerprint)

async def find_running_by_fingerprint(self, fingerprint: str) -> Commit | None:
return self._core.find_running_by_fingerprint(fingerprint)

async def list_commits(
self,
func_name: str | None = None,
limit: int = 50,
status: TaskStatus | None = None,
tags: dict[str, str | None] | None = None,
) -> list[Commit]:
return self._core.list_commits(
func_name=func_name,
limit=limit,
status=status,
tags=tags,
)

async def get_history(self, hash: str) -> list[Commit]:
return self._core.get_history(hash)

async def stats(self) -> dict[str, int]:
return self._core.stats()

async def evict(self, older_than: datetime, max_size_bytes: int | None = None) -> int:
return self._core.evict(older_than, max_size_bytes=max_size_bytes)

async def delete_commit(self, hash: str) -> bool:
return self._core.delete_commit(hash)

async def delete_by_tags(self, tags: dict[str, str | None]) -> int:
return self._core.delete_by_tags(tags)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 close() calls self._core.close() synchronously, blocking the event loop. Wrap in asyncio.to_thread():

Suggested change
async def close(self) -> None:
await asyncio.to_thread(self._core.close)
for lock_path in self._lock_paths:
with contextlib.suppress(OSError):
Path(lock_path).unlink(missing_ok=True)
self._lock_paths.clear()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 close() calls self._core.close() synchronously, blocking the event loop. Wrap in asyncio.to_thread():

Suggested change
async def close(self) -> None:
await asyncio.to_thread(self._core.close)
for lock_path in self._lock_paths:
with contextlib.suppress(OSError):
Path(lock_path).unlink(missing_ok=True)
self._lock_paths.clear()

async def close(self) -> None:
self._core.close()
for lock_path in self._lock_paths:
with contextlib.suppress(OSError):
Path(lock_path).unlink(missing_ok=True)
self._lock_paths.clear()


class AsyncSQLiteStore:
def __init__(self, root: Path) -> None:
self._core = _SQLiteStoreCore(root)
self._write_lock = asyncio.Lock()
self._lock_paths: set[str] = set()

def _fingerprint_lock(self, fingerprint: str) -> _SQLiteFingerprintLock:
import hashlib

fp_hash = hashlib.sha256(fingerprint.encode()).hexdigest()[:16]
return _SQLiteFingerprintLock(str(self._core.root / f".lock-{fp_hash}"))
lock_path = str(self._core.root / f".lock-{fp_hash}")
self._lock_paths.add(lock_path)
return _SQLiteFingerprintLock(lock_path)

@property
def root(self) -> Path:
Expand Down Expand Up @@ -922,6 +1026,10 @@ async def delete_by_tags(self, tags: dict[str, str | None]) -> int:

async def close(self) -> None:
await asyncio.to_thread(self._core.close)
for lock_path in self._lock_paths:
with contextlib.suppress(OSError):
Path(lock_path).unlink(missing_ok=True)
self._lock_paths.clear()


class SQLiteStore:
Expand Down Expand Up @@ -950,13 +1058,28 @@ def objects_dir(self) -> Path:
def db_path(self) -> Path:
return self._async_store.db_path

@property
def _core(self) -> _SQLiteStoreCore:
return self._async_store._core # pyright: ignore[reportPrivateUsage]

@property
def _direct_async_store(self) -> _DirectAsyncSQLiteStore:
return _DirectAsyncSQLiteStore(self._core)

def _fingerprint_lock_sync(self, fingerprint: str) -> _SyncSQLiteFingerprintLock:
import hashlib
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 SQLiteStore._fingerprint_lock_sync directly accesses self._async_store._lock_paths, breaking encapsulation and requiring # pyright: ignore[reportPrivateUsage]. Add a public method in AsyncSQLiteStore (e.g., register_lock_path(path)) to avoid this.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 SQLiteStore._fingerprint_lock_sync directly accesses self._async_store._lock_paths, breaking encapsulation and requiring # pyright: ignore[reportPrivateUsage]. Add a public method in AsyncSQLiteStore (e.g., register_lock_path(path)) to avoid this.


fp_hash = hashlib.sha256(fingerprint.encode()).hexdigest()[:16]
lock_path = str(self.root / f".lock-{fp_hash}")
self._async_store._lock_paths.add(lock_path) # pyright: ignore[reportPrivateUsage]
return _SyncSQLiteFingerprintLock(lock_path)

def _connect(self, *, immediate: bool = False) -> sqlite3.Connection:
core: Any = self._async_store._core # pyright: ignore[reportPrivateUsage]
return core._connect(immediate=immediate)
core: _SQLiteStoreCore = self._core
return core._connect(immediate=immediate) # pyright: ignore[reportPrivateUsage]

def blob_exists(self, hash: str) -> bool:
core: Any = self._async_store._core # pyright: ignore[reportPrivateUsage]
return core.blob_exists(hash)
return self._core.blob_exists(hash)

def put_blob(self, data: bytes) -> ObjectRef:
return self._runner.call(self._async_store.put_blob(data))
Expand Down
Loading