diff --git a/examples/monitor/monitor.py b/examples/monitor/monitor.py deleted file mode 100644 index e5927a1..0000000 --- a/examples/monitor/monitor.py +++ /dev/null @@ -1,342 +0,0 @@ -#!/usr/bin/env python3 -""" -OmniQ observer - -Polls OmniQ monitoring keys and optional transactional keys to help validate -behavior under stress. - -Examples: - python omniq_observer.py --redis-url redis://omniq-redis:6379/0 - python omniq_observer.py --redis-url redis://omniq-redis:6379/0 --interval 0.5 --csv omniq_observer.csv - python omniq_observer.py --redis-url redis://omniq-redis:6379/0 --queues emails,pdfs --raw-verify -""" - -from __future__ import annotations - -import argparse -import csv -import os -import signal -import sys -import time -from dataclasses import dataclass -from typing import Iterable - -import redis - - -@dataclass -class QueueSnapshot: - ts_ms: int - queue: str - paused: int - waiting: int - group_waiting: int - waiting_total: int - active: int - delayed: int - failed: int - completed_kept: int - groups_ready: int - last_activity_ms: int - last_enqueue_ms: int - last_reserve_ms: int - last_finish_ms: int - raw_waiting: int | None = None - raw_group_waiting: int | None = None - raw_waiting_total: int | None = None - raw_active: int | None = None - raw_delayed: int | None = None - raw_failed: int | None = None - raw_completed_kept: int | None = None - raw_groups_ready: int | None = None - ok_waiting: int | None = None - ok_group_waiting: int | None = None - ok_waiting_total: int | None = None - ok_active: int | None = None - ok_delayed: int | None = None - ok_failed: int | None = None - ok_completed_kept: int | None = None - ok_groups_ready: int | None = None - - -class Observer: - def __init__( - self, - redis_client: redis.Redis, - queues: list[str] | None, - raw_verify: bool, - csv_path: str | None, - console_every: int, - ) -> None: - self.r = redis_client - self.explicit_queues = queues - self.raw_verify = raw_verify - self.csv_path = csv_path - self.console_every = max(1, console_every) - self._stop = False - self._loop_n = 0 - self._csv_file = None - self._csv_writer = None - - def stop(self, *_args) -> None: - self._stop = True - - def install_signal_handlers(self) -> None: - signal.signal(signal.SIGINT, self.stop) - signal.signal(signal.SIGTERM, self.stop) - - @staticmethod - def _to_i(value: object) -> int: - if value is None: - return 0 - if isinstance(value, bytes): - value = value.decode("utf-8", errors="replace") - if value == "": - return 0 - try: - return int(float(value)) - except Exception: - return 0 - - def _discover_queues(self) -> list[str]: - if self.explicit_queues is not None: - return self.explicit_queues - names = sorted( - q.decode("utf-8", errors="replace") if isinstance(q, bytes) else str(q) - for q in self.r.smembers("omniq:queues") - ) - return names - - def _read_stats(self, queue: str) -> dict[str, int]: - stats = self.r.hgetall(f"{queue}:stats") - decoded: dict[str, int] = {} - for k, v in stats.items(): - key = k.decode("utf-8", errors="replace") if isinstance(k, bytes) else str(k) - decoded[key] = self._to_i(v) - return decoded - - def _scan_group_waiting(self, queue: str) -> int: - total = 0 - cursor = 0 - pattern = f"{queue}:g:*:wait" - while True: - cursor, keys = self.r.scan(cursor=cursor, match=pattern, count=200) - if keys: - pipe = self.r.pipeline(transaction=False) - for key in keys: - pipe.llen(key) - lengths = pipe.execute() - total += sum(self._to_i(x) for x in lengths) - if cursor == 0: - break - return total - - def _read_raw(self, queue: str) -> dict[str, int]: - raw = { - "waiting": self._to_i(self.r.llen(f"{queue}:wait")), - "active": self._to_i(self.r.zcard(f"{queue}:active")), - "delayed": self._to_i(self.r.zcard(f"{queue}:delayed")), - "failed": self._to_i(self.r.llen(f"{queue}:failed")), - "completed_kept": self._to_i(self.r.llen(f"{queue}:completed")), - "groups_ready": self._to_i(self.r.zcard(f"{queue}:groups:ready")), - } - raw["group_waiting"] = self._scan_group_waiting(queue) - raw["waiting_total"] = raw["waiting"] + raw["group_waiting"] - return raw - - def snapshot_queue(self, queue: str) -> QueueSnapshot: - now_ms = int(time.time() * 1000) - stats = self._read_stats(queue) - paused = 1 if self.r.exists(f"{queue}:paused") else 0 - - snap = QueueSnapshot( - ts_ms=now_ms, - queue=queue, - paused=paused, - waiting=stats.get("waiting", 0), - group_waiting=stats.get("group_waiting", 0), - waiting_total=stats.get("waiting_total", 0), - active=stats.get("active", 0), - delayed=stats.get("delayed", 0), - failed=stats.get("failed", 0), - completed_kept=stats.get("completed_kept", 0), - groups_ready=stats.get("groups_ready", 0), - last_activity_ms=stats.get("last_activity_ms", 0), - last_enqueue_ms=stats.get("last_enqueue_ms", 0), - last_reserve_ms=stats.get("last_reserve_ms", 0), - last_finish_ms=stats.get("last_finish_ms", 0), - ) - - if self.raw_verify: - raw = self._read_raw(queue) - snap.raw_waiting = raw["waiting"] - snap.raw_group_waiting = raw["group_waiting"] - snap.raw_waiting_total = raw["waiting_total"] - snap.raw_active = raw["active"] - snap.raw_delayed = raw["delayed"] - snap.raw_failed = raw["failed"] - snap.raw_completed_kept = raw["completed_kept"] - snap.raw_groups_ready = raw["groups_ready"] - - snap.ok_waiting = int(snap.waiting == snap.raw_waiting) - snap.ok_group_waiting = int(snap.group_waiting == snap.raw_group_waiting) - snap.ok_waiting_total = int(snap.waiting_total == snap.raw_waiting_total) - snap.ok_active = int(snap.active == snap.raw_active) - snap.ok_delayed = int(snap.delayed == snap.raw_delayed) - snap.ok_failed = int(snap.failed == snap.raw_failed) - snap.ok_completed_kept = int(snap.completed_kept == snap.raw_completed_kept) - snap.ok_groups_ready = int(snap.groups_ready == snap.raw_groups_ready) - - return snap - - def _ensure_csv(self) -> None: - if not self.csv_path or self._csv_writer is not None: - return - path = Path(self.csv_path) - path.parent.mkdir(parents=True, exist_ok=True) - exists = path.exists() and path.stat().st_size > 0 - self._csv_file = path.open("a", newline="", encoding="utf-8") - self._csv_writer = csv.DictWriter(self._csv_file, fieldnames=list(QueueSnapshot.__dataclass_fields__.keys())) - if not exists: - self._csv_writer.writeheader() - self._csv_file.flush() - - def write_csv(self, snaps: Iterable[QueueSnapshot]) -> None: - if not self.csv_path: - return - self._ensure_csv() - assert self._csv_writer is not None - for snap in snaps: - self._csv_writer.writerow(snap.__dict__) - assert self._csv_file is not None - self._csv_file.flush() - - def print_console(self, snaps: list[QueueSnapshot]) -> None: - if not snaps: - print(f"[{int(time.time())}] no queues discovered") - return - - headers = ["queue", "paused", "wait", "gwait", "wtotal", "active", "delayed", "failed", "done", "gready"] - rows = [] - for s in snaps: - rows.append([ - s.queue, - str(s.paused), - str(s.waiting), - str(s.group_waiting), - str(s.waiting_total), - str(s.active), - str(s.delayed), - str(s.failed), - str(s.completed_kept), - str(s.groups_ready), - ]) - - widths = [len(h) for h in headers] - for row in rows: - for i, cell in enumerate(row): - widths[i] = max(widths[i], len(cell)) - - def fmt(row: list[str]) -> str: - return " ".join(cell.ljust(widths[i]) for i, cell in enumerate(row)) - - print() - print(fmt(headers)) - print(fmt(["-" * w for w in widths])) - for row in rows: - print(fmt(row)) - - if self.raw_verify: - mismatches = [] - for s in snaps: - bad = [] - if s.ok_waiting == 0: - bad.append("waiting") - if s.ok_group_waiting == 0: - bad.append("group_waiting") - if s.ok_waiting_total == 0: - bad.append("waiting_total") - if s.ok_active == 0: - bad.append("active") - if s.ok_delayed == 0: - bad.append("delayed") - if s.ok_failed == 0: - bad.append("failed") - if s.ok_completed_kept == 0: - bad.append("completed_kept") - if s.ok_groups_ready == 0: - bad.append("groups_ready") - if bad: - mismatches.append(f"{s.queue}: {', '.join(bad)}") - if mismatches: - print("verify:", " | ".join(mismatches)) - else: - print("verify: all stats match raw keys") - - def run(self, interval_s: float, duration_s: float | None, once: bool) -> int: - self.install_signal_handlers() - started = time.monotonic() - - while not self._stop: - queues = self._discover_queues() - snaps = [self.snapshot_queue(q) for q in queues] - self.write_csv(snaps) - - if self._loop_n % self.console_every == 0: - self.print_console(snaps) - - self._loop_n += 1 - - if once: - break - if duration_s is not None and (time.monotonic() - started) >= duration_s: - break - - time.sleep(interval_s) - - if self._csv_file is not None: - self._csv_file.close() - return 0 - - -def build_parser() -> argparse.ArgumentParser: - p = argparse.ArgumentParser(description="Observe OmniQ queue stats and optional raw Redis validation.") - p.add_argument("--redis-url", default=os.getenv("REDIS_URL", "redis://omniq-redis:6379/0")) - p.add_argument("--queues", default="", help="Comma-separated queue names. Empty means discover from omniq:queues.") - p.add_argument("--interval", type=float, default=1.0, help="Polling interval in seconds.") - p.add_argument("--duration", type=float, default=None, help="Optional total run duration in seconds.") - p.add_argument("--csv", default="", help="Optional CSV output path.") - p.add_argument("--raw-verify", action="store_true", help="Compare monitoring stats against transactional keys.") - p.add_argument("--console-every", type=int, default=1, help="Print every N loops.") - p.add_argument("--once", action="store_true", help="Run a single snapshot and exit.") - return p - - -def main() -> int: - args = build_parser().parse_args() - try: - client = redis.Redis.from_url(args.redis_url, decode_responses=False) - client.ping() - except Exception as exc: - print(f"Redis connection failed: {exc}", file=sys.stderr) - return 2 - - queues = [q.strip() for q in args.queues.split(",") if q.strip()] or None - - observer = Observer( - redis_client=client, - queues=queues, - raw_verify=args.raw_verify, - csv_path=args.csv or None, - console_every=args.console_every, - ) - return observer.run( - interval_s=max(0.05, args.interval), - duration_s=args.duration, - once=args.once, - ) - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/pyproject.toml b/pyproject.toml index 7bee49a..48907f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "omniq" -version = "2.0.0" +version = "3.0.0" authors = [ { name="Not Empty Foundation", email="dev@not-empty.org" }, ] diff --git a/src/omniq/__init__.py b/src/omniq/__init__.py index 0150782..a6a42e9 100644 --- a/src/omniq/__init__.py +++ b/src/omniq/__init__.py @@ -1,3 +1,12 @@ from .client import OmniqClient from .consumer import consume +from .monitor import QueueMonitor from .types import JobCtx, PayloadT + +__all__ = [ + "OmniqClient", + "QueueMonitor", + "consume", + "JobCtx", + "PayloadT", +] \ No newline at end of file diff --git a/src/omniq/_ops.py b/src/omniq/_ops.py index 9059c62..1afa8dd 100644 --- a/src/omniq/_ops.py +++ b/src/omniq/_ops.py @@ -7,7 +7,7 @@ from .clock import now_ms from .ids import new_ulid -from .types import ReservePaused, ReserveJob, ReserveResult, AckFailResult, BatchRemoveResult, BatchRetryFailedResult +from .types import ReservePaused, ReserveJob, ReserveResult, AckFailResult, BatchRemoveResult, BatchRetryFailedResult, BatchResultItem from .transport import RedisLike from .scripts import OmniqScripts from .helper import queue_base, queue_anchor, childs_anchor @@ -373,7 +373,7 @@ def retry_failed_batch( i += 3 else: i += 2 - out.append((job_id, status, reason)) + out.append(BatchResultItem(job_id=job_id, status=status, reason=reason)) return out def remove_job(self, *, queue: str, job_id: str, lane: str) -> str: @@ -442,7 +442,7 @@ def remove_jobs_batch( i += 3 else: i += 2 - out.append((job_id, status, reason)) + out.append(BatchResultItem(job_id=job_id, status=status, reason=reason)) return out def childs_init(self, *, key: str, expected: int) -> None: diff --git a/src/omniq/client.py b/src/omniq/client.py index 6a8d949..3305fe4 100644 --- a/src/omniq/client.py +++ b/src/omniq/client.py @@ -90,11 +90,12 @@ def publish( payload: Any, job_id: Optional[str] = None, max_attempts: int = 3, - timeout_ms: int = 60_000, + timeout_ms: int = 30_000, backoff_ms: int = 5_000, due_ms: int = 0, gid: Optional[str] = None, group_limit: int = 0, + now_ms_override: int = 0, ) -> str: return self._ops.publish( queue=queue, @@ -106,6 +107,7 @@ def publish( due_ms=due_ms, gid=gid, group_limit=group_limit, + now_ms_override=now_ms_override, ) def publish_json( @@ -115,11 +117,12 @@ def publish_json( payload: Any, job_id: Optional[str] = None, max_attempts: int = 3, - timeout_ms: int = 60_000, + timeout_ms: int = 30_000, backoff_ms: int = 5_000, due_ms: int = 0, gid: Optional[str] = None, group_limit: int = 0, + now_ms_override: int = 0, ) -> str: if isinstance(payload, (dict, list)): structured = payload @@ -154,6 +157,7 @@ def publish_json( due_ms=due_ms, gid=gid, group_limit=group_limit, + now_ms_override=now_ms_override, ) def reserve(self, *, queue: str, now_ms_override: int = 0) -> ReserveResult: diff --git a/src/omniq/core/scripts/ack_fail.lua b/src/omniq/core/scripts/ack_fail.lua index 3b63edf..32e031e 100644 --- a/src/omniq/core/scripts/ack_fail.lua +++ b/src/omniq/core/scripts/ack_fail.lua @@ -17,13 +17,16 @@ end local base = derive_base(anchor) -local k_job = base .. ":job:" .. job_id -local k_active = base .. ":active" -local k_delayed = base .. ":delayed" -local k_failed = base .. ":failed" -local k_gready = base .. ":groups:ready" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_job = base .. ":job:" .. job_id +local k_active = base .. ":active" +local k_delayed = base .. ":delayed" +local k_failed = base .. ":failed" +local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_active = base .. ":idx:active" +local k_idx_delayed = base .. ":idx:delayed" +local k_idx_failed = base .. ":idx:failed" local function to_i(v) if v == false or v == nil or v == '' then return 0 end @@ -114,9 +117,13 @@ if attempt >= max_attempts then redis.call("HSET", k_job, "state", "failed", "updated_ms", tostring(now_ms), + "failed_ms", tostring(now_ms), "lease_token", "", "lock_until_ms", "" ) + redis.call("ZREM", k_idx_active, job_id) + redis.call("ZADD", k_idx_failed, now_ms, job_id) + redis.call("LPUSH", k_failed, job_id) redis.call("HINCRBY", k_stats, "failed", 1) @@ -131,6 +138,9 @@ redis.call("HSET", k_job, "lease_token", "", "lock_until_ms", "" ) +redis.call("ZREM", k_idx_active, job_id) +redis.call("ZADD", k_idx_delayed, now_ms, job_id) + redis.call("ZADD", k_delayed, due_ms, job_id) redis.call("HINCRBY", k_stats, "delayed", 1) diff --git a/src/omniq/core/scripts/ack_success.lua b/src/omniq/core/scripts/ack_success.lua index e5ada30..2de1ad7 100644 --- a/src/omniq/core/scripts/ack_success.lua +++ b/src/omniq/core/scripts/ack_success.lua @@ -16,12 +16,14 @@ end local base = derive_base(anchor) -local k_job = base .. ":job:" .. job_id -local k_active = base .. ":active" -local k_completed = base .. ":completed" -local k_gready = base .. ":groups:ready" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_job = base .. ":job:" .. job_id +local k_active = base .. ":active" +local k_completed = base .. ":completed" +local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_active = base .. ":idx:active" +local k_idx_completed = base .. ":idx:completed" local function to_i(v) if v == false or v == nil or v == '' then return 0 end @@ -73,10 +75,14 @@ redis.call("SADD", k_queues, base) redis.call("HSET", k_job, "state", "completed", "updated_ms", tostring(now_ms), + "completed_ms", tostring(now_ms), "lease_token", "", "lock_until_ms", "" ) +redis.call("ZREM", k_idx_active, job_id) +redis.call("ZADD", k_idx_completed, now_ms, job_id) + hincrby_floor0(k_stats, "active", -1) redis.call("HSET", k_stats, "last_activity_ms", tostring(now_ms), @@ -102,6 +108,7 @@ redis.call("LPUSH", k_completed, job_id) while redis.call("LLEN", k_completed) > KEEP_COMPLETED do local old_id = redis.call("RPOP", k_completed) if old_id then + redis.call("ZREM", k_idx_completed, old_id) redis.call("DEL", base .. ":job:" .. old_id) end end diff --git a/src/omniq/core/scripts/enqueue.lua b/src/omniq/core/scripts/enqueue.lua index aa3c56b..c8fb96f 100644 --- a/src/omniq/core/scripts/enqueue.lua +++ b/src/omniq/core/scripts/enqueue.lua @@ -22,12 +22,14 @@ end local base = derive_base(anchor) -local k_job = base .. ":job:" .. job_id -local k_delayed = base .. ":delayed" -local k_wait = base .. ":wait" -local k_has_groups = base .. ":has_groups" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_job = base .. ":job:" .. job_id +local k_delayed = base .. ":delayed" +local k_wait = base .. ":wait" +local k_has_groups = base .. ":has_groups" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_wait = base .. ":idx:wait" +local k_idx_delayed = base .. ":idx:delayed" local is_grouped = (gid ~= nil and gid ~= "") redis.call("SADD", k_queues, base) @@ -43,7 +45,12 @@ if is_grouped then "timeout_ms", tostring(timeout_ms), "backoff_ms", tostring(backoff_ms), "created_ms", tostring(now_ms), - "updated_ms", tostring(now_ms) + "updated_ms", tostring(now_ms), + "queued_ms", tostring(now_ms), + "first_started_ms", "", + "last_started_ms", "", + "completed_ms", "", + "failed_ms", "" ) redis.call("SET", k_has_groups, "1") @@ -64,12 +71,18 @@ else "timeout_ms", tostring(timeout_ms), "backoff_ms", tostring(backoff_ms), "created_ms", tostring(now_ms), - "updated_ms", tostring(now_ms) + "updated_ms", tostring(now_ms), + "queued_ms", tostring(now_ms), + "first_started_ms", "", + "last_started_ms", "", + "completed_ms", "", + "failed_ms", "" ) end if due_ms ~= nil and due_ms > now_ms then redis.call("ZADD", k_delayed, due_ms, job_id) + redis.call("ZADD", k_idx_delayed, now_ms, job_id) redis.call("HSET", k_job, "state", "delayed", "due_ms", tostring(due_ms)) redis.call("HINCRBY", k_stats, "delayed", 1) redis.call("HSET", k_stats, @@ -80,6 +93,7 @@ else if is_grouped then local k_gwait = base .. ":g:" .. gid .. ":wait" redis.call("RPUSH", k_gwait, job_id) + redis.call("ZADD", k_idx_wait, now_ms, job_id) redis.call("HINCRBY", k_stats, "group_waiting", 1) redis.call("HINCRBY", k_stats, "waiting_total", 1) redis.call("HSET", k_stats, @@ -100,6 +114,7 @@ else end else redis.call("RPUSH", k_wait, job_id) + redis.call("ZADD", k_idx_wait, now_ms, job_id) redis.call("HINCRBY", k_stats, "waiting", 1) redis.call("HINCRBY", k_stats, "waiting_total", 1) redis.call("HSET", k_stats, diff --git a/src/omniq/core/scripts/promote_delayed.lua b/src/omniq/core/scripts/promote_delayed.lua index 089d929..d10c10e 100644 --- a/src/omniq/core/scripts/promote_delayed.lua +++ b/src/omniq/core/scripts/promote_delayed.lua @@ -14,9 +14,11 @@ end local base = derive_base(anchor) -local k_delayed = base .. ":delayed" -local k_wait = base .. ":wait" -local k_gready = base .. ":groups:ready" +local k_delayed = base .. ":delayed" +local k_wait = base .. ":wait" +local k_gready = base .. ":groups:ready" +local k_idx_delayed = base .. ":idx:delayed" +local k_idx_wait = base .. ":idx:wait" local k_stats = base .. ":stats" local k_queues = "omniq:queues" @@ -59,6 +61,9 @@ for i=1,#ids do local k_job = base .. ":job:" .. job_id redis.call("HSET", k_job, "state", "wait", "updated_ms", tostring(now_ms)) + redis.call("ZREM", k_idx_delayed, job_id) + redis.call("ZADD", k_idx_wait, now_ms, job_id) + dec_delayed = dec_delayed - 1 local gid = redis.call("HGET", k_job, "gid") diff --git a/src/omniq/core/scripts/reap_expired.lua b/src/omniq/core/scripts/reap_expired.lua index 1f17c01..d98e7f4 100644 --- a/src/omniq/core/scripts/reap_expired.lua +++ b/src/omniq/core/scripts/reap_expired.lua @@ -14,12 +14,15 @@ end local base = derive_base(anchor) -local k_active = base .. ":active" -local k_delayed = base .. ":delayed" -local k_failed = base .. ":failed" -local k_gready = base .. ":groups:ready" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_active = base .. ":active" +local k_delayed = base .. ":delayed" +local k_failed = base .. ":failed" +local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_active = base .. ":idx:active" +local k_idx_delayed = base .. ":idx:delayed" +local k_idx_failed = base .. ":idx:failed" local function to_i(v) if v == false or v == nil or v == '' then return 0 end @@ -73,6 +76,7 @@ for i=1,#ids do local k_job = base .. ":job:" .. job_id if redis.call("EXISTS", k_job) == 0 then + redis.call("ZREM", k_idx_active, job_id) reaped = reaped + 1 else redis.call("HSET", k_job, "lease_token", "") @@ -100,9 +104,13 @@ for i=1,#ids do redis.call("HSET", k_job, "state", "failed", "updated_ms", tostring(now_ms), + "failed_ms", tostring(now_ms), "lease_token", "", "lock_until_ms", "" ) + redis.call("ZREM", k_idx_active, job_id) + redis.call("ZADD", k_idx_failed, now_ms, job_id) + redis.call("LPUSH", k_failed, job_id) inc_failed = inc_failed + 1 else @@ -114,6 +122,9 @@ for i=1,#ids do "lease_token", "", "lock_until_ms", "" ) + redis.call("ZREM", k_idx_active, job_id) + redis.call("ZADD", k_idx_delayed, now_ms, job_id) + redis.call("ZADD", k_delayed, due_ms, job_id) inc_delayed = inc_delayed + 1 end diff --git a/src/omniq/core/scripts/remove_job.lua b/src/omniq/core/scripts/remove_job.lua index d24e7a4..c5b71da 100644 --- a/src/omniq/core/scripts/remove_job.lua +++ b/src/omniq/core/scripts/remove_job.lua @@ -45,15 +45,20 @@ end local base = derive_base(anchor) -local k_job = base .. ":job:" .. job_id -local k_wait = base .. ":wait" -local k_active = base .. ":active" -local k_delayed = base .. ":delayed" -local k_failed = base .. ":failed" -local k_completed = base .. ":completed" -local k_gready = base .. ":groups:ready" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_job = base .. ":job:" .. job_id +local k_wait = base .. ":wait" +local k_active = base .. ":active" +local k_delayed = base .. ":delayed" +local k_failed = base .. ":failed" +local k_completed = base .. ":completed" +local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_wait = base .. ":idx:wait" +local k_idx_active = base .. ":idx:active" +local k_idx_delayed = base .. ":idx:delayed" +local k_idx_failed = base .. ":idx:failed" +local k_idx_completed = base .. ":idx:completed" if redis.call("EXISTS", k_job) ~= 1 then return {"ERR", "NO_JOB"} @@ -84,7 +89,6 @@ if lane == "gwait" and (gid == nil or gid == "") then return {"ERR", "LANE_MISMATCH"} end -local ginflight_dec = 0 if gid ~= "" then local lt = redis.call("HGET", k_job, "lease_token") or "" local lu = redis.call("HGET", k_job, "lock_until_ms") or "" @@ -92,7 +96,6 @@ if gid ~= "" then if looks_reserved then local k_ginflight = base .. ":g:" .. gid .. ":inflight" dec_floor0(k_ginflight) - ginflight_dec = 1 end end @@ -117,8 +120,6 @@ elseif lane == "completed" then elseif lane == "gwait" then local k_gwait = base .. ":g:" .. gid .. ":wait" - local was_ready = (redis.call("ZSCORE", k_gready, gid) ~= false) - removed = redis.call("LREM", k_gwait, 1, job_id) local inflight = to_i(redis.call("GET", base .. ":g:" .. gid .. ":inflight")) @@ -149,19 +150,24 @@ redis.call("DEL", k_job) redis.call("SADD", k_queues, base) if lane == "wait" then + redis.call("ZREM", k_idx_wait, job_id) hincrby_floor0(k_stats, "waiting", -1) hincrby_floor0(k_stats, "waiting_total", -1) elseif lane == "delayed" then + redis.call("ZREM", k_idx_delayed, job_id) hincrby_floor0(k_stats, "delayed", -1) elseif lane == "failed" then + redis.call("ZREM", k_idx_failed, job_id) hincrby_floor0(k_stats, "failed", -1) elseif lane == "completed" then + redis.call("ZREM", k_idx_completed, job_id) hincrby_floor0(k_stats, "completed_kept", -1) elseif lane == "gwait" then + redis.call("ZREM", k_idx_wait, job_id) hincrby_floor0(k_stats, "group_waiting", -1) hincrby_floor0(k_stats, "waiting_total", -1) diff --git a/src/omniq/core/scripts/remove_jobs_batch.lua b/src/omniq/core/scripts/remove_jobs_batch.lua index 7e4c93d..0c4b6dc 100644 --- a/src/omniq/core/scripts/remove_jobs_batch.lua +++ b/src/omniq/core/scripts/remove_jobs_batch.lua @@ -56,14 +56,19 @@ end local base = derive_base(anchor) -local k_active = base .. ":active" -local k_wait = base .. ":wait" -local k_delayed = base .. ":delayed" -local k_failed = base .. ":failed" -local k_completed = base .. ":completed" -local k_gready = base .. ":groups:ready" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_active = base .. ":active" +local k_wait = base .. ":wait" +local k_delayed = base .. ":delayed" +local k_failed = base .. ":failed" +local k_completed = base .. ":completed" +local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_wait = base .. ":idx:wait" +local k_idx_active = base .. ":idx:active" +local k_idx_delayed = base .. ":idx:delayed" +local k_idx_failed = base .. ":idx:failed" +local k_idx_completed = base .. ":idx:completed" local out = {} @@ -137,6 +142,8 @@ for i = 1, count do if removed <= 0 then push(job_id, "ERR", "NOT_IN_LANE") else + redis.call("ZREM", k_idx_wait, job_id) + redis.call("DEL", k_job) dec_waiting = dec_waiting - 1 dec_waiting_total = dec_waiting_total - 1 @@ -152,6 +159,8 @@ for i = 1, count do if removed <= 0 then push(job_id, "ERR", "NOT_IN_LANE") else + redis.call("ZREM", k_idx_delayed, job_id) + redis.call("DEL", k_job) dec_delayed = dec_delayed - 1 removed_ok = removed_ok + 1 @@ -164,6 +173,8 @@ for i = 1, count do if removed <= 0 then push(job_id, "ERR", "NOT_IN_LANE") else + redis.call("ZREM", k_idx_failed, job_id) + redis.call("DEL", k_job) dec_failed = dec_failed - 1 removed_ok = removed_ok + 1 @@ -175,6 +186,8 @@ for i = 1, count do if removed <= 0 then push(job_id, "ERR", "NOT_IN_LANE") else + redis.call("ZREM", k_idx_completed, job_id) + redis.call("DEL", k_job) dec_completed_kept = dec_completed_kept - 1 removed_ok = removed_ok + 1 @@ -203,6 +216,8 @@ for i = 1, count do end end + redis.call("ZREM", k_idx_wait, job_id) + redis.call("DEL", k_job) dec_group_waiting = dec_group_waiting - 1 dec_waiting_total = dec_waiting_total - 1 diff --git a/src/omniq/core/scripts/reserve.lua b/src/omniq/core/scripts/reserve.lua index 5adb1f3..3e17f22 100644 --- a/src/omniq/core/scripts/reserve.lua +++ b/src/omniq/core/scripts/reserve.lua @@ -19,13 +19,15 @@ end local DEFAULT_GROUP_LIMIT = 1 local MAX_GROUP_POPS = 10 -local k_wait = base .. ":wait" -local k_active = base .. ":active" -local k_gready = base .. ":groups:ready" -local k_rr = base .. ":lane:rr" -local k_token_seq = base .. ":lease:seq" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_wait = base .. ":wait" +local k_active = base .. ":active" +local k_gready = base .. ":groups:ready" +local k_rr = base .. ":lane:rr" +local k_token_seq = base .. ":lease:seq" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_wait = base .. ":idx:wait" +local k_idx_active = base .. ":idx:active" local function to_i(v) if v == false or v == nil or v == '' then return 0 end @@ -59,17 +61,28 @@ local function lease_job(job_id) local payload = redis.call("HGET", k_job, "payload") or "" local gid = redis.call("HGET", k_job, "gid") or "" + local first_started_ms = redis.call("HGET", k_job, "first_started_ms") or "" local lease_token = new_lease_token(job_id) + if first_started_ms == "" then + redis.call("HSET", k_job, + "first_started_ms", tostring(now_ms) + ) + end + redis.call("HSET", k_job, "state", "active", "attempt", tostring(attempt), "lock_until_ms", tostring(lock_until), "lease_token", lease_token, - "updated_ms", tostring(now_ms) + "updated_ms", tostring(now_ms), + "last_started_ms", tostring(now_ms) ) + redis.call("ZREM", k_idx_wait, job_id) + redis.call("ZADD", k_idx_active, now_ms, job_id) + redis.call("ZADD", k_active, lock_until, job_id) redis.call("HINCRBY", k_stats, "active", 1) diff --git a/src/omniq/core/scripts/retry_failed.lua b/src/omniq/core/scripts/retry_failed.lua index e7ec45f..f23ee01 100644 --- a/src/omniq/core/scripts/retry_failed.lua +++ b/src/omniq/core/scripts/retry_failed.lua @@ -30,14 +30,17 @@ end local base = derive_base(anchor) -local k_job = base .. ":job:" .. job_id -local k_wait = base .. ":wait" -local k_active = base .. ":active" -local k_delayed = base .. ":delayed" -local k_failed = base .. ":failed" -local k_gready = base .. ":groups:ready" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_job = base .. ":job:" .. job_id +local k_wait = base .. ":wait" +local k_active = base .. ":active" +local k_delayed = base .. ":delayed" +local k_failed = base .. ":failed" +local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_wait = base .. ":idx:wait" +local k_idx_delayed= base .. ":idx:delayed" +local k_idx_failed = base .. ":idx:failed" if redis.call("EXISTS", k_job) ~= 1 then return {"ERR", "NO_JOB"} @@ -53,13 +56,17 @@ redis.call("ZREM", k_delayed, job_id) redis.call("LREM", k_wait, 0, job_id) redis.call("LREM", k_failed, 0, job_id) +redis.call("ZREM", k_idx_delayed, job_id) +redis.call("ZREM", k_idx_failed, job_id) + redis.call("HSET", k_job, "state", "wait", "attempt", "0", "updated_ms", tostring(now_ms), "lease_token", "", "lock_until_ms", "", - "due_ms", "" + "due_ms", "", + "failed_ms", "" ) local gid = redis.call("HGET", k_job, "gid") or "" @@ -76,6 +83,7 @@ if gid ~= "" then local k_glimit = base .. ":g:" .. gid .. ":limit" redis.call("RPUSH", k_gwait, job_id) + redis.call("ZADD", k_idx_wait, now_ms, job_id) inc_group_waiting = 1 inc_waiting_total = 1 @@ -92,6 +100,7 @@ if gid ~= "" then end else redis.call("RPUSH", k_wait, job_id) + redis.call("ZADD", k_idx_wait, now_ms, job_id) inc_waiting = 1 inc_waiting_total = 1 end diff --git a/src/omniq/core/scripts/retry_failed_batch.lua b/src/omniq/core/scripts/retry_failed_batch.lua index 96013aa..b46f501 100644 --- a/src/omniq/core/scripts/retry_failed_batch.lua +++ b/src/omniq/core/scripts/retry_failed_batch.lua @@ -31,13 +31,16 @@ end local base = derive_base(anchor) -local k_wait = base .. ":wait" -local k_active = base .. ":active" -local k_delayed = base .. ":delayed" -local k_failed = base .. ":failed" -local k_gready = base .. ":groups:ready" -local k_stats = base .. ":stats" -local k_queues = "omniq:queues" +local k_wait = base .. ":wait" +local k_active = base .. ":active" +local k_delayed = base .. ":delayed" +local k_failed = base .. ":failed" +local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" +local k_idx_wait = base .. ":idx:wait" +local k_idx_delayed = base .. ":idx:delayed" +local k_idx_failed = base .. ":idx:failed" local out = {} @@ -82,18 +85,21 @@ for i = 1, count do if st ~= "failed" then push(job_id, "ERR", "NOT_FAILED") else - redis.call("ZREM", k_active, job_id) redis.call("ZREM", k_delayed, job_id) redis.call("LREM", k_wait, 0, job_id) redis.call("LREM", k_failed, 0, job_id) + redis.call("ZREM", k_idx_delayed, job_id) + redis.call("ZREM", k_idx_failed, job_id) + redis.call("HSET", k_job, "state", "wait", "attempt", "0", "updated_ms", tostring(now_ms), "lease_token", "", "lock_until_ms", "", - "due_ms", "" + "due_ms", "", + "failed_ms", "" ) local gid = redis.call("HGET", k_job, "gid") or "" @@ -106,6 +112,7 @@ for i = 1, count do local k_glimit = base .. ":g:" .. gid .. ":limit" redis.call("RPUSH", k_gwait, job_id) + redis.call("ZADD", k_idx_wait, now_ms, job_id) inc_group_waiting = inc_group_waiting + 1 inc_waiting_total = inc_waiting_total + 1 @@ -122,6 +129,7 @@ for i = 1, count do end else redis.call("RPUSH", k_wait, job_id) + redis.call("ZADD", k_idx_wait, now_ms, job_id) inc_waiting = inc_waiting + 1 inc_waiting_total = inc_waiting_total + 1 end diff --git a/src/omniq/exec.py b/src/omniq/exec.py index 5bb3df9..4086167 100644 --- a/src/omniq/exec.py +++ b/src/omniq/exec.py @@ -15,7 +15,7 @@ def publish( payload: Any, job_id: Optional[str] = None, max_attempts: int = 3, - timeout_ms: int = 60_000, + timeout_ms: int = 30_000, backoff_ms: int = 5_000, due_ms: int = 0, gid: Optional[str] = None, diff --git a/src/omniq/monitor.py b/src/omniq/monitor.py index 5187414..ef47ff2 100644 --- a/src/omniq/monitor.py +++ b/src/omniq/monitor.py @@ -1,251 +1,91 @@ -from dataclasses import dataclass -from typing import List, Optional - -from .helper import as_str, queue_base - -@dataclass(frozen=True) -class QueueCounts: - paused: bool - waiting: int - active: int - delayed: int - completed: int - failed: int - -@dataclass(frozen=True) -class GroupStatus: - gid: str - inflight: int - limit: int - -@dataclass(frozen=True) -class ActiveSample: - job_id: str - gid: str - lock_until_ms: int - attempt: int - -@dataclass(frozen=True) -class DelayedSample: - job_id: str - gid: str - due_ms: int - attempt: int - -@dataclass(frozen=True) -class FailedSample: - job_id: str - gid: str - attempt: int - max_attempts: int - failed_at_ms: int - last_error: str - -@dataclass(frozen=True) -class JobInfo: - job_id: str - state: str - gid: str - attempt: int - max_attempts: int - timeout_ms: int - backoff_ms: int - lease_token: str - lock_until_ms: int - due_ms: int - payload: str - last_error: str - updated_ms: int +from typing import Iterable, Optional + +from .monitor_core import QueueMonitorCore +from .monitor_models import ( + GroupReady, + GroupStatus, + JobInfo, + LaneJob, + LaneName, + QueueOverview, + QueueStats, +) class QueueMonitor: def __init__(self, uq): - self._uq = uq - self._r = ( - getattr(uq, "r", None) - or getattr(getattr(uq, "ops", None), "r", None) - or getattr(getattr(uq, "_ops", None), "r", None) + self._core = QueueMonitorCore(uq) + + def list_queues(self) -> list[str]: + return self._core.list_queues() + + def stats(self, queue: str) -> QueueStats: + return self._core.stats(queue) + + def stats_many(self, queues: Optional[Iterable[str]] = None) -> list[QueueStats]: + return self._core.stats_many(queues) + + def groups_ready( + self, + queue: str, + offset: int = 0, + limit: int = 200, + ) -> list[str]: + return self._core.groups_ready( + queue=queue, + offset=offset, + limit=limit, ) - if self._r is None: - raise ValueError("QueueMonitor needs redis access (inject from server, do not expose to UI callers).") - - def _base(self, queue: str) -> str: - return queue_base(queue) - - def counts(self, queue: str) -> QueueCounts: - base = self._base(queue) - r = self._r - - paused = r.exists(f"{base}:paused") == 1 - waiting = int(r.llen(f"{base}:wait") or 0) - active = int(r.zcard(f"{base}:active") or 0) - delayed = int(r.zcard(f"{base}:delayed") or 0) - completed = int(r.llen(f"{base}:completed") or 0) - failed = int(r.llen(f"{base}:failed") or 0) - - return QueueCounts( - paused=paused, - waiting=waiting, - active=active, - delayed=delayed, - completed=completed, - failed=failed, + def groups_ready_with_scores( + self, + queue: str, + offset: int = 0, + limit: int = 200, + ) -> list[GroupReady]: + return self._core.groups_ready_with_scores( + queue=queue, + offset=offset, + limit=limit, ) - def groups_ready(self, queue: str, limit: int = 200) -> List[str]: - base = self._base(queue) - r = self._r - limit = max(1, min(int(limit), 2000)) - try: - gids = r.zrange(f"{base}:groups:ready", 0, limit - 1) - return [as_str(g) for g in gids if g] - except Exception: - return [] - - def group_status(self, queue: str, gids: List[str], default_limit: int = 1) -> List[GroupStatus]: - base = self._base(queue) - r = self._r - - out: List[GroupStatus] = [] - for gid in gids: - gid_s = as_str(gid) - - inflight = int(as_str(r.get(f"{base}:g:{gid_s}:inflight")) or "0") - - raw = r.get(f"{base}:g:{gid_s}:limit") - gl = int(as_str(raw) or "0") - limit = gl if gl > 0 else int(default_limit) - - out.append(GroupStatus(gid=gid_s, inflight=inflight, limit=limit)) - return out - - def sample_active(self, queue: str, limit: int = 50) -> List[ActiveSample]: - base = self._base(queue) - r = self._r - limit = max(1, min(int(limit), 500)) - - job_ids = r.zrange(f"{base}:active", 0, limit - 1) - out: List[ActiveSample] = [] - - for jid in job_ids: - jid_s = as_str(jid) - k_job = f"{base}:job:{jid_s}" - gid, attempt = r.hmget(k_job, "gid", "attempt") - score = r.zscore(f"{base}:active", jid) or 0 - - out.append( - ActiveSample( - job_id=jid_s, - gid=as_str(gid), - lock_until_ms=int(score), - attempt=int(as_str(attempt) or "0"), - ) - ) - return out - - def sample_delayed(self, queue: str, limit: int = 50) -> List[DelayedSample]: - base = self._base(queue) - r = self._r - limit = max(1, min(int(limit), 500)) - - job_ids = r.zrange(f"{base}:delayed", 0, limit - 1) - out: List[DelayedSample] = [] - - for jid in job_ids: - jid_s = as_str(jid) - k_job = f"{base}:job:{jid_s}" - gid, attempt = r.hmget(k_job, "gid", "attempt") - due = r.zscore(f"{base}:delayed", jid) or 0 - - out.append( - DelayedSample( - job_id=jid_s, - gid=as_str(gid), - due_ms=int(due), - attempt=int(as_str(attempt) or "0"), - ) - ) - return out - - def sample_failed(self, queue: str, limit: int = 50) -> List[FailedSample]: - base = self._base(queue) - r = self._r - limit = max(1, min(int(limit), 500)) - - try: - job_ids = r.lrange(f"{base}:failed", 0, limit - 1) - except Exception: - return [] - - out: List[FailedSample] = [] - - for jid in job_ids: - jid_s = as_str(jid) - k_job = f"{base}:job:{jid_s}" - - try: - gid, attempt, max_attempts, last_error, last_error_ms, updated_ms = r.hmget( - k_job, - "gid", - "attempt", - "max_attempts", - "last_error", - "last_error_ms", - "updated_ms", - ) - except Exception: - gid = attempt = max_attempts = last_error = last_error_ms = updated_ms = None - - fam = int(as_str(last_error_ms) or "0") - if fam <= 0: - fam = int(as_str(updated_ms) or "0") - - out.append( - FailedSample( - job_id=jid_s, - gid=as_str(gid), - attempt=int(as_str(attempt) or "0"), - max_attempts=int(as_str(max_attempts) or "0"), - failed_at_ms=fam, - last_error=as_str(last_error), - ) - ) + def group_status( + self, + queue: str, + gids: list[str], + default_limit: int = 1, + ) -> list[GroupStatus]: + return self._core.group_status( + queue=queue, + gids=gids, + default_limit=default_limit, + ) - return out + def lane_page( + self, + queue: str, + lane: LaneName, + offset: int = 0, + limit: int = 25, + reverse: bool = False, + ) -> list[LaneJob]: + return self._core.lane_page( + queue=queue, + lane=lane, + offset=offset, + limit=limit, + reverse=reverse, + ) def get_job(self, queue: str, job_id: str) -> Optional[JobInfo]: - base = self._base(queue) - r = self._r - jid_s = as_str(job_id) - k_job = f"{base}:job:{jid_s}" - - if r.exists(k_job) != 1: - return None - - fields = [ - "state", "gid", "attempt", "max_attempts", "timeout_ms", "backoff_ms", - "lease_token", "lock_until_ms", "due_ms", "payload", "last_error", "updated_ms", - ] - - try: - vals = r.hmget(k_job, *fields) - except Exception: - return None - - m = {fields[i]: vals[i] for i in range(len(fields))} - - return JobInfo( - job_id=jid_s, - state=as_str(m["state"]), - gid=as_str(m["gid"]), - attempt=int(as_str(m["attempt"]) or "0"), - max_attempts=int(as_str(m["max_attempts"]) or "0"), - timeout_ms=int(as_str(m["timeout_ms"]) or "0"), - backoff_ms=int(as_str(m["backoff_ms"]) or "0"), - lease_token=as_str(m["lease_token"]), - lock_until_ms=int(as_str(m["lock_until_ms"]) or "0"), - due_ms=int(as_str(m["due_ms"]) or "0"), - payload=as_str(m["payload"]), - last_error=as_str(m["last_error"]), - updated_ms=int(as_str(m["updated_ms"]) or "0"), - ) \ No newline at end of file + return self._core.get_job(queue, job_id) + + def find_jobs( + self, + queue: str, + lane: LaneName, + job_ids: Iterable[str], + ) -> list[LaneJob]: + return self._core.find_jobs(queue, lane, job_ids) + + def overview(self, queue: str, samples_per_lane: int = 10) -> QueueOverview: + return self._core.overview(queue, samples_per_lane) \ No newline at end of file diff --git a/src/omniq/monitor_core.py b/src/omniq/monitor_core.py new file mode 100644 index 0000000..48b9e1f --- /dev/null +++ b/src/omniq/monitor_core.py @@ -0,0 +1,386 @@ +from typing import Any, Dict, Iterable, Optional + +from .helper import as_str, queue_base +from .monitor_models import ( + GroupReady, + GroupStatus, + JobInfo, + LaneJob, + LaneName, + QueueOverview, + QueueStats, +) + +class QueueMonitorCore: + QUEUE_REGISTRY = "omniq:queues" + + MAX_LIST_LIMIT = 25 + MAX_GROUP_LIMIT = 500 + + def __init__(self, uq: Any): + self._uq = uq + self._r = ( + getattr(uq, "r", None) + or getattr(getattr(uq, "ops", None), "r", None) + or getattr(getattr(uq, "_ops", None), "r", None) + ) + if self._r is None: + raise ValueError( + "QueueMonitor needs redis access (inject from server, do not expose to UI callers)." + ) + + def _base(self, queue: str) -> str: + return queue_base(queue) + + def _stats_key(self, base: str) -> str: + return f"{base}:stats" + + def _paused_key(self, base: str) -> str: + return f"{base}:paused" + + def _ready_key(self, base: str) -> str: + return f"{base}:groups:ready" + + def _job_key(self, base: str, job_id: str) -> str: + return f"{base}:job:{job_id}" + + def _idx_key(self, base: str, lane: LaneName) -> str: + return f"{base}:idx:{lane}" + + def _gwait_key(self, base: str, gid: str) -> str: + return f"{base}:g:{gid}:wait" + + def _ginflight_key(self, base: str, gid: str) -> str: + return f"{base}:g:{gid}:inflight" + + def _glimit_key(self, base: str, gid: str) -> str: + return f"{base}:g:{gid}:limit" + + @staticmethod + def _to_int(value: Any, default: int = 0) -> int: + s = as_str(value) + if s == "": + return default + try: + return int(float(s)) + except Exception: + return default + + @staticmethod + def _normalize_queue_name(base_or_queue: str) -> str: + value = (base_or_queue or "").strip() + if value.startswith("{") and value.endswith("}"): + return value[1:-1] + return value + + @staticmethod + def _decode_hash(raw: Dict[Any, Any]) -> Dict[str, Any]: + out: Dict[str, Any] = {} + for k, v in (raw or {}).items(): + out[as_str(k)] = v + return out + + def _clamp_list_limit(self, limit: int) -> int: + return max(1, min(int(limit), self.MAX_LIST_LIMIT)) + + def _clamp_group_limit(self, limit: int) -> int: + return max(1, min(int(limit), self.MAX_GROUP_LIMIT)) + + def _read_job_map(self, base: str, job_id: str) -> Optional[Dict[str, Any]]: + key = self._job_key(base, job_id) + try: + if self._r.exists(key) != 1: + return None + raw = self._r.hgetall(key) or {} + except Exception: + return None + return self._decode_hash(raw) + + def _is_group_ready(self, base: str, gid: str) -> bool: + try: + return self._r.zscore(self._ready_key(base), gid) is not None + except Exception: + return False + + def _job_info_from_map(self, job_id: str, m: Dict[str, Any]) -> JobInfo: + return JobInfo( + job_id=job_id, + state=as_str(m.get("state")), + gid=as_str(m.get("gid")), + attempt=self._to_int(m.get("attempt")), + max_attempts=self._to_int(m.get("max_attempts")), + timeout_ms=self._to_int(m.get("timeout_ms")), + backoff_ms=self._to_int(m.get("backoff_ms")), + lease_token=as_str(m.get("lease_token")), + lock_until_ms=self._to_int(m.get("lock_until_ms")), + due_ms=self._to_int(m.get("due_ms")), + payload=as_str(m.get("payload")), + last_error=as_str(m.get("last_error")), + last_error_ms=self._to_int(m.get("last_error_ms")), + created_ms=self._to_int(m.get("created_ms")), + updated_ms=self._to_int(m.get("updated_ms")), + queued_ms=self._to_int(m.get("queued_ms")), + first_started_ms=self._to_int(m.get("first_started_ms")), + last_started_ms=self._to_int(m.get("last_started_ms")), + completed_ms=self._to_int(m.get("completed_ms")), + failed_ms=self._to_int(m.get("failed_ms")), + ) + + def _lane_job_from_map( + self, + lane: LaneName, + job_id: str, + idx_score_ms: int, + m: Dict[str, Any], + ) -> LaneJob: + return LaneJob( + lane=lane, + job_id=job_id, + idx_score_ms=idx_score_ms, + state=as_str(m.get("state")), + gid=as_str(m.get("gid")), + attempt=self._to_int(m.get("attempt")), + max_attempts=self._to_int(m.get("max_attempts")), + due_ms=self._to_int(m.get("due_ms")), + lock_until_ms=self._to_int(m.get("lock_until_ms")), + queued_ms=self._to_int(m.get("queued_ms")), + first_started_ms=self._to_int(m.get("first_started_ms")), + last_started_ms=self._to_int(m.get("last_started_ms")), + completed_ms=self._to_int(m.get("completed_ms")), + failed_ms=self._to_int(m.get("failed_ms")), + updated_ms=self._to_int(m.get("updated_ms")), + last_error=as_str(m.get("last_error")), + ) + + def list_queues(self) -> list[str]: + try: + bases = self._r.smembers(self.QUEUE_REGISTRY) or [] + except Exception: + return [] + + names = [self._normalize_queue_name(as_str(x)) for x in bases if as_str(x)] + names.sort() + return names + + def stats(self, queue: str) -> QueueStats: + base = self._base(queue) + + try: + raw = self._r.hgetall(self._stats_key(base)) or {} + except Exception: + raw = {} + + stats_map = self._decode_hash(raw) + + try: + paused = self._r.exists(self._paused_key(base)) == 1 + except Exception: + paused = False + + waiting = self._to_int(stats_map.get("waiting")) + group_waiting = self._to_int(stats_map.get("group_waiting")) + waiting_total = self._to_int(stats_map.get("waiting_total")) + + if waiting_total <= 0 and (waiting > 0 or group_waiting > 0): + waiting_total = waiting + group_waiting + + return QueueStats( + queue=self._normalize_queue_name(queue), + paused=paused, + waiting=waiting, + group_waiting=group_waiting, + waiting_total=waiting_total, + active=self._to_int(stats_map.get("active")), + delayed=self._to_int(stats_map.get("delayed")), + failed=self._to_int(stats_map.get("failed")), + completed_kept=self._to_int(stats_map.get("completed_kept")), + groups_ready=self._to_int(stats_map.get("groups_ready")), + last_activity_ms=self._to_int(stats_map.get("last_activity_ms")), + last_enqueue_ms=self._to_int(stats_map.get("last_enqueue_ms")), + last_reserve_ms=self._to_int(stats_map.get("last_reserve_ms")), + last_finish_ms=self._to_int(stats_map.get("last_finish_ms")), + ) + + def stats_many(self, queues: Optional[Iterable[str]] = None) -> list[QueueStats]: + target = list(queues) if queues is not None else self.list_queues() + return [self.stats(q) for q in target] + + def groups_ready( + self, + queue: str, + offset: int = 0, + limit: int = 200, + ) -> list[str]: + rows = self.groups_ready_with_scores(queue=queue, offset=offset, limit=limit) + return [x.gid for x in rows] + + def groups_ready_with_scores( + self, + queue: str, + offset: int = 0, + limit: int = 200, + ) -> list[GroupReady]: + base = self._base(queue) + offset = max(0, int(offset)) + limit = self._clamp_group_limit(limit) + + try: + rows = self._r.zrange( + self._ready_key(base), + offset, + offset + limit - 1, + withscores=True, + ) + except Exception: + return [] + + return [ + GroupReady(gid=as_str(gid), score_ms=self._to_int(score)) + for gid, score in rows + if as_str(gid) + ] + + def group_status( + self, + queue: str, + gids: list[str], + default_limit: int = 1, + ) -> list[GroupStatus]: + base = self._base(queue) + default_limit = max(1, int(default_limit)) + + normalized_gids = [as_str(g) for g in gids if as_str(g)] + normalized_gids = normalized_gids[: self.MAX_GROUP_LIMIT] + + out: list[GroupStatus] = [] + for gid_s in normalized_gids: + try: + inflight = self._to_int(self._r.get(self._ginflight_key(base, gid_s))) + except Exception: + inflight = 0 + + try: + raw_limit = self._to_int(self._r.get(self._glimit_key(base, gid_s))) + except Exception: + raw_limit = 0 + + limit = raw_limit if raw_limit > 0 else default_limit + + try: + waiting_count = self._to_int(self._r.llen(self._gwait_key(base, gid_s))) + except Exception: + waiting_count = 0 + + out.append( + GroupStatus( + gid=gid_s, + inflight=inflight, + limit=limit, + ready=self._is_group_ready(base, gid_s), + waiting_count=waiting_count, + ) + ) + return out + + def lane_page( + self, + queue: str, + lane: LaneName, + offset: int = 0, + limit: int = 25, + reverse: bool = False, + ) -> list[LaneJob]: + base = self._base(queue) + offset = max(0, int(offset)) + limit = self._clamp_list_limit(limit) + key = self._idx_key(base, lane) + + try: + if reverse: + rows = self._r.zrevrange(key, offset, offset + limit - 1, withscores=True) + else: + rows = self._r.zrange(key, offset, offset + limit - 1, withscores=True) + except Exception: + return [] + + out: list[LaneJob] = [] + for raw_job_id, raw_score in rows: + job_id = as_str(raw_job_id) + if not job_id: + continue + + m = self._read_job_map(base, job_id) + if not m: + continue + + out.append( + self._lane_job_from_map( + lane=lane, + job_id=job_id, + idx_score_ms=self._to_int(raw_score), + m=m, + ) + ) + return out + + def get_job(self, queue: str, job_id: str) -> Optional[JobInfo]: + base = self._base(queue) + job_id = as_str(job_id) + if not job_id: + return None + m = self._read_job_map(base, job_id) + if not m: + return None + return self._job_info_from_map(job_id, m) + + def find_jobs( + self, + queue: str, + lane: LaneName, + job_ids: Iterable[str], + ) -> list[LaneJob]: + base = self._base(queue) + idx_key = self._idx_key(base, lane) + out: list[LaneJob] = [] + + for raw_job_id in job_ids: + job_id = as_str(raw_job_id) + if not job_id: + continue + + try: + score = self._r.zscore(idx_key, job_id) + except Exception: + score = None + + if score is None: + continue + + m = self._read_job_map(base, job_id) + if not m: + continue + + out.append( + self._lane_job_from_map( + lane=lane, + job_id=job_id, + idx_score_ms=self._to_int(score), + m=m, + ) + ) + return out + + def overview(self, queue: str, samples_per_lane: int = 10) -> QueueOverview: + samples_per_lane = self._clamp_list_limit(samples_per_lane) + + return QueueOverview( + stats=self.stats(queue), + ready_groups=self.groups_ready_with_scores( + queue, + limit=samples_per_lane, + ), + active=self.lane_page(queue, "active", limit=samples_per_lane), + delayed=self.lane_page(queue, "delayed", limit=samples_per_lane), + failed=self.lane_page(queue, "failed", limit=samples_per_lane), + completed=self.lane_page(queue, "completed", limit=samples_per_lane), + ) diff --git a/src/omniq/monitor_models.py b/src/omniq/monitor_models.py new file mode 100644 index 0000000..4f0eda9 --- /dev/null +++ b/src/omniq/monitor_models.py @@ -0,0 +1,85 @@ +from dataclasses import dataclass +from typing import Literal + +LaneName = Literal["wait", "active", "delayed", "failed", "completed"] + +@dataclass(frozen=True) +class QueueStats: + queue: str + paused: bool + waiting: int + group_waiting: int + waiting_total: int + active: int + delayed: int + failed: int + completed_kept: int + groups_ready: int + last_activity_ms: int + last_enqueue_ms: int + last_reserve_ms: int + last_finish_ms: int + +@dataclass(frozen=True) +class GroupReady: + gid: str + score_ms: int + +@dataclass(frozen=True) +class GroupStatus: + gid: str + inflight: int + limit: int + ready: bool + waiting_count: int + +@dataclass(frozen=True) +class LaneJob: + lane: LaneName + job_id: str + idx_score_ms: int + state: str + gid: str + attempt: int + max_attempts: int + due_ms: int + lock_until_ms: int + queued_ms: int + first_started_ms: int + last_started_ms: int + completed_ms: int + failed_ms: int + updated_ms: int + last_error: str + +@dataclass(frozen=True) +class JobInfo: + job_id: str + state: str + gid: str + attempt: int + max_attempts: int + timeout_ms: int + backoff_ms: int + lease_token: str + lock_until_ms: int + due_ms: int + payload: str + last_error: str + last_error_ms: int + created_ms: int + updated_ms: int + queued_ms: int + first_started_ms: int + last_started_ms: int + completed_ms: int + failed_ms: int + +@dataclass(frozen=True) +class QueueOverview: + stats: QueueStats + ready_groups: list[GroupReady] + active: list[LaneJob] + delayed: list[LaneJob] + failed: list[LaneJob] + completed: list[LaneJob] \ No newline at end of file diff --git a/src/omniq/types.py b/src/omniq/types.py index 9eeef79..0a8ba9b 100644 --- a/src/omniq/types.py +++ b/src/omniq/types.py @@ -30,6 +30,13 @@ class ReserveJob: lease_token: str AckFailResult = Tuple[Literal["RETRY", "FAILED"], Optional[int]] -BatchRemoveResult = List[Tuple[str, str, Optional[str]]] -BatchRetryFailedResult = List[Tuple[str, str, Optional[str]]] + +@dataclass(frozen=True) +class BatchResultItem: + job_id: str + status: str + reason: Optional[str] = None + +BatchRemoveResult = List[BatchResultItem] +BatchRetryFailedResult = List[BatchResultItem] ReserveResult = Union[None, ReservePaused, ReserveJob]