diff --git a/openviking/storage/transaction/lock_manager.py b/openviking/storage/transaction/lock_manager.py index 1d1b949a0..f6e9a15e1 100644 --- a/openviking/storage/transaction/lock_manager.py +++ b/openviking/storage/transaction/lock_manager.py @@ -33,6 +33,7 @@ def __init__( self._redo_log = RedoLog(agfs) self._handles: Dict[str, LockHandle] = {} self._cleanup_task: Optional[asyncio.Task] = None + self._redo_task: Optional[asyncio.Task] = None self._running = False @property @@ -54,11 +55,18 @@ async def start(self) -> None: """Start background cleanup and redo recovery.""" self._running = True self._cleanup_task = asyncio.create_task(self._stale_cleanup_loop()) - await self._recover_pending_redo() + self._redo_task = asyncio.create_task(self._recover_pending_redo()) async def stop(self) -> None: """Stop cleanup and release all active locks.""" self._running = False + if self._redo_task: + self._redo_task.cancel() + try: + await self._redo_task + except asyncio.CancelledError: + pass + self._redo_task = None if self._cleanup_task: self._cleanup_task.cancel() try: @@ -299,11 +307,14 @@ async def _redo_session_memory(self, info: Dict[str, Any]) -> None: from openviking.session import create_session_compressor compressor = create_session_compressor(vikingdb=None) - memories = await compressor.extract_long_term_memories( - messages=messages, - user=user, - session_id=session_id, - ctx=ctx, + memories = await asyncio.wait_for( + compressor.extract_long_term_memories( + messages=messages, + user=user, + session_id=session_id, + ctx=ctx, + ), + timeout=60.0, ) logger.info(f"Redo: extracted {len(memories)} memories from {archive_uri}") except Exception as e: