Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/superlocalmemory/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,17 @@ def cmd_remember(args: Namespace) -> None:
log_dir = __import__("pathlib").Path.home() / ".superlocalmemory" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "async-remember.log"

kwargs = {}
if sys.platform == "win32":
kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
else:
kwargs["start_new_session"] = True

with open(log_file, "a") as lf:
subprocess.Popen(
cmd, stdout=subprocess.DEVNULL, stderr=lf,
start_new_session=True,
**kwargs
)

if use_json:
Expand Down Expand Up @@ -1201,7 +1208,10 @@ def _kill_existing_on_port(target_port: int) -> None:
)
cmd = ps_result.stdout.strip().lower()
if "superlocalmemory" in cmd or "slm" in cmd or "uvicorn" in cmd:
os.kill(pid, signal.SIGTERM)
if sys.platform == "win32":
subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)])
else:
os.kill(pid, signal.SIGTERM)
print(f" Stopped previous dashboard (PID {pid})")
import time
time.sleep(1)
Expand Down
33 changes: 29 additions & 4 deletions src/superlocalmemory/cli/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,16 @@ def ensure_daemon() -> bool:
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "daemon.log"

kwargs = {}
if sys.platform == "win32":
kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
else:
kwargs["start_new_session"] = True

with open(log_file, "a") as lf:
subprocess.Popen(
cmd, stdout=lf, stderr=lf,
start_new_session=True,
**kwargs
)

# Wait for daemon to become ready (max 30s for cold start)
Expand All @@ -132,12 +138,26 @@ def stop_daemon() -> bool:
return True
try:
pid = int(_PID_FILE.read_text().strip())
os.kill(pid, signal.SIGTERM)
if sys.platform == "win32":
import subprocess
subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)])
else:
os.kill(pid, signal.SIGTERM)

# Wait for cleanup
for _ in range(20):
time.sleep(0.5)
try:
os.kill(pid, 0)
if sys.platform == "win32":
import ctypes
# Using OpenProcess to check if PID is alive on Windows
# PROCESS_QUERY_INFORMATION = 0x0400
handle = ctypes.windll.kernel32.OpenProcess(0x0400, False, pid)
if handle == 0:
break
ctypes.windll.kernel32.CloseHandle(handle)
else:
os.kill(pid, 0)
except ProcessLookupError:
break
_PID_FILE.unlink(missing_ok=True)
Expand Down Expand Up @@ -424,7 +444,12 @@ def start_server(port: int = _DEFAULT_PORT, idle_timeout: int | None = None) ->
_PORT_FILE.write_text(str(port))

# Handle SIGTERM for graceful shutdown
signal.signal(signal.SIGTERM, lambda *_: _shutdown_server() or os._exit(0))
if sys.platform != "win32":
signal.signal(signal.SIGTERM, lambda *_: _shutdown_server() or os._exit(0))
else:
# On Windows, we can use SIGBREAK if available
if hasattr(signal, 'SIGBREAK'):
signal.signal(signal.SIGBREAK, lambda *_: _shutdown_server() or os._exit(0))

# Pre-warm engine (this is the cold start — daemon absorbs it once)
logger.info("Daemon starting — warming engine...")
Expand Down
55 changes: 47 additions & 8 deletions src/superlocalmemory/core/embedding_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
os.environ["TORCH_DEVICE"] = "cpu"
# V3.3.17: Disable CoreML EP for ONNX Runtime — uses 3-5GB on ARM64 Mac.
os.environ["ORT_DISABLE_COREML"] = "1"
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
os.environ["TQDM_DISABLE"] = "1"

# SIGTERM bridge: Docker/systemd send SIGTERM to stop processes.
# Without this, the worker ignores SIGTERM and becomes a zombie.
Expand All @@ -51,17 +53,45 @@ def _start_parent_watchdog() -> None:
process crashes, is killed, or exits without cleanup.

V3.3.7: Added after incident where orphaned workers consumed 33 GB.
V3.3.22: Enabled for Windows and added errno check.
"""
parent_pid = os.getppid()
try:
parent_pid = os.getppid()
except AttributeError:
return
if parent_pid <= 1:
return

def _watch() -> None:
import time
import errno
while True:
time.sleep(5)
try:
os.kill(parent_pid, 0)
except OSError:
os._exit(0)
if sys.platform == "win32":
# On Windows, os.kill(pid, 0) raises PermissionError if process exists
# but we don't have certain rights. psutil is more reliable.
try:
import psutil
if not psutil.pid_exists(parent_pid):
os._exit(0)
except ImportError:
# Fallback to os.kill if psutil is not available
try:
os.kill(parent_pid, 0)
except OSError as e:
if e.errno == errno.ESRCH:
os._exit(0)
# On Windows, EACCES/EPERM means it EXISTS but we can't kill it.
else:
os.kill(parent_pid, 0)
except OSError as e:
# ESRCH: No such process. EPERM: Access denied (process exists).
if e.errno == errno.ESRCH:
os._exit(0)
except Exception:
# Safety fallback for any other OS-specific issues
pass

t = threading.Thread(target=_watch, daemon=True, name="parent-watchdog")
t.start()
Expand Down Expand Up @@ -97,7 +127,7 @@ def _load_embedding_model(name: str) -> tuple:

def _worker_main() -> None:
"""Main loop: read JSON requests from stdin, write responses to stdout."""
_start_parent_watchdog() # V3.3.7: self-terminate if parent dies
_start_parent_watchdog() # V3.3.7: self-terminate if parent dies (V3.3.22: all platforms)

import numpy as np

Expand Down Expand Up @@ -169,10 +199,19 @@ def _worker_main() -> None:
# a worker that started at 300MB grows to 17GB+. Parent auto-respawns
# a fresh worker on next request (existing mechanism in embeddings.py).
# V3.3.21: Configurable via SLM_EMBED_WORKER_RSS_LIMIT_MB (default 2500MB).
import resource
_rss_limit = int(os.environ.get("SLM_EMBED_WORKER_RSS_LIMIT_MB", 2500))
rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 / 1024
if rss_mb > _rss_limit:
rss_mb = 0.0
if sys.platform != "win32":
import resource
rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 / 1024
else:
try:
import psutil
rss_mb = psutil.Process().memory_info().rss / 1024 / 1024
except ImportError:
pass # psutil not available, skip RSS check on Windows

if rss_mb > 0 and rss_mb > _rss_limit:
sys.exit(0)

continue
Expand Down
8 changes: 7 additions & 1 deletion src/superlocalmemory/core/embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ def _ensure_worker(self) -> None:
"TOKENIZERS_PARALLELISM": "false",
"TORCH_DEVICE": "cpu",
}
kwargs = {}
if sys.platform == "win32":
kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
else:
kwargs["start_new_session"] = True

self._worker_proc = subprocess.Popen(
[sys.executable, "-m", worker_module],
stdin=subprocess.PIPE,
Expand All @@ -419,7 +425,7 @@ def _ensure_worker(self) -> None:
text=True,
bufsize=1,
env=env,
start_new_session=True, # Prevent terminal signals bleeding to worker
**kwargs
)
logger.info("Embedding worker spawned (PID %d)", self._worker_proc.pid)
self._worker_ready = True
Expand Down
54 changes: 47 additions & 7 deletions src/superlocalmemory/core/recall_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import signal
import sys
import threading
import logging

# Force CPU BEFORE any torch import
os.environ["CUDA_VISIBLE_DEVICES"] = ""
Expand All @@ -30,6 +31,15 @@
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["TORCH_DEVICE"] = "cpu"

# V3.3.30: Configure logging to stderr immediately to avoid stdout corruption.
# If any component logs to stdout, it breaks the JSON protocol.
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger("recall_worker")

# SIGTERM bridge: Docker/systemd send SIGTERM to stop processes.
# Without this, the worker ignores SIGTERM and becomes a zombie.
if sys.platform != "win32":
Expand All @@ -44,16 +54,37 @@ def _start_parent_watchdog() -> None:

V3.3.7: Added after incident where orphaned workers consumed 33 GB.
"""
parent_pid = os.getppid()
try:
parent_pid = os.getppid()
except AttributeError:
return # getppid not available on some platforms
if parent_pid <= 1:
return

def _watch() -> None:
import time
import errno
while True:
time.sleep(5)
try:
os.kill(parent_pid, 0)
except OSError:
os._exit(0)
if sys.platform == "win32":
try:
import psutil
if not psutil.pid_exists(parent_pid):
os._exit(0)
except ImportError:
try:
os.kill(parent_pid, 0)
except OSError as e:
if e.errno == errno.ESRCH:
os._exit(0)
else:
os.kill(parent_pid, 0)
except OSError as e:
if e.errno == errno.ESRCH:
os._exit(0)
except Exception:
pass

t = threading.Thread(target=_watch, daemon=True, name="parent-watchdog")
t.start()
Expand Down Expand Up @@ -321,10 +352,19 @@ def _worker_main() -> None:
except Exception as exc:
_respond({"ok": False, "error": str(exc)})

# V3.3.16: RSS watchdog — self-terminate if memory exceeds 1.5GB.
# V3.3.16: RSS watchdog — self-terminate if memory exceeds limit.
# Parent auto-respawns a fresh worker on next request.
import resource
rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 / 1024
rss_mb = 0.0
if sys.platform != "win32":
import resource
rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 / 1024
else:
try:
import psutil
rss_mb = psutil.Process().memory_info().rss / 1024 / 1024
except ImportError:
pass # skip RSS check if psutil not available

if rss_mb > 2500:
sys.exit(0)

Expand Down
43 changes: 36 additions & 7 deletions src/superlocalmemory/core/reranker_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,37 @@ def _start_parent_watchdog() -> None:

V3.3.7: Added after incident where ~30 orphaned workers consumed 33 GB.
"""
parent_pid = os.getppid()
try:
parent_pid = os.getppid()
except AttributeError:
return
if parent_pid <= 1:
return

def _watch() -> None:
import time
import errno
while True:
time.sleep(5)
try:
os.kill(parent_pid, 0) # Check if parent is alive (signal 0)
except OSError:
# Parent is dead — self-terminate
os._exit(0)
if sys.platform == "win32":
try:
import psutil
if not psutil.pid_exists(parent_pid):
os._exit(0)
except ImportError:
try:
os.kill(parent_pid, 0)
except OSError as e:
if e.errno == errno.ESRCH:
os._exit(0)
else:
os.kill(parent_pid, 0)
except OSError as e:
if e.errno == errno.ESRCH:
os._exit(0)
except Exception:
pass

t = threading.Thread(target=_watch, daemon=True, name="parent-watchdog")
t.start()
Expand Down Expand Up @@ -181,8 +201,17 @@ def _worker_main() -> None:
_respond({"ok": False, "error": str(exc)})

# V3.3.16: RSS watchdog — same as embedding_worker
import resource
rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 / 1024
rss_mb = 0.0
if sys.platform != "win32":
import resource
rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 / 1024
else:
try:
import psutil
rss_mb = psutil.Process().memory_info().rss / 1024 / 1024
except ImportError:
pass # skip RSS check if psutil not available

if rss_mb > 2500:
sys.exit(0)

Expand Down
8 changes: 7 additions & 1 deletion src/superlocalmemory/core/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ def _ensure_worker(self) -> None:
"TOKENIZERS_PARALLELISM": "false",
"TORCH_DEVICE": "cpu",
}
kwargs = {}
if sys.platform == "win32":
kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
else:
kwargs["start_new_session"] = True

self._proc = subprocess.Popen(
[sys.executable, "-m", "superlocalmemory.core.recall_worker"],
stdin=subprocess.PIPE,
Expand All @@ -244,7 +250,7 @@ def _ensure_worker(self) -> None:
text=True,
bufsize=1,
env=env,
start_new_session=True, # Prevent terminal signals bleeding to worker
**kwargs
)
logger.info("Recall worker spawned (PID %d)", self._proc.pid)
except Exception as exc:
Expand Down
Loading