Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions printer_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""

import os
import re
import ssl
import logging
import pathlib
Expand Down Expand Up @@ -102,9 +103,12 @@ def normalize_filename(filename: str) -> str:
"""Normalize filename for comparison (no path, no extension, lowercase)"""
if not filename:
return ""
base = os.path.basename(filename)
base = os.path.basename(filename).strip()
name_no_ext = os.path.splitext(base)[0]
return name_no_ext.strip().lower()
cleaned = re.sub(r"[^0-9a-zA-Z]+", "-", name_no_ext).strip("-_").lower()
# Collapse multiple separators and ensure deterministic formatting
cleaned = re.sub(r"[-_]+", "-", cleaned)
return cleaned

def files_match(job_filename: str, printer_filename: str) -> bool:
"""
Expand Down
193 changes: 155 additions & 38 deletions queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
PROCESS_JSON = os.environ.get("BAMBU_PROCESS_JSON", str(PROFILES_DIR / "process.json"))
FILAMENT_JSON = os.environ.get("BAMBU_FILAMENT_JSON", str(PROFILES_DIR / "filament.json"))

AUTO_UPLOAD = os.environ.get("AUTO_UPLOAD", "true").lower() in ("true", "1", "yes")
AUTO_UPLOAD = os.environ.get("AUTO_UPLOAD", "true").lower() in ("true", "1", "yes")
ASSIGN_TO_FINISH = os.environ.get("ASSIGN_TO_FINISH", "false").lower() in ("true", "1", "yes")

# ============ Database ============
def ensure_dirs():
Expand Down Expand Up @@ -814,10 +815,10 @@ async def api_queue_upload(

# ============ Printer Endpoints ============
@app.get("/api/printers/available")
def api_printers_available(
tag: Optional[str] = Query(None),
include_disabled: bool = Query(False, description="Include printers with autoprint=false")
):
def api_printers_available(
tag: Optional[str] = Query(None),
include_disabled: bool = Query(False, description="Include printers with autoprint=false")
):
"""Get available printers with current status"""
try:
cfg = get_printers_config()
Expand Down Expand Up @@ -872,39 +873,155 @@ def sort_key(x):

except Exception as e:
logger.error(f"Error fetching printers: {e}", exc_info=True)
return {"printers": []}

@app.get("/api/queue/waiting")
def api_queue_waiting():
"""Number of jobs waiting for an IDLE printer"""
con = db_conn()
cur = con.cursor()

# PENDING jobs (not yet assigned)
pending = cur.execute(
"SELECT COUNT(*) as cnt FROM queue_jobs WHERE status='PENDING'"
).fetchone()["cnt"]

# READY jobs (assigned but not uploaded yet)
ready = cur.execute(
"SELECT COUNT(*) as cnt FROM queue_jobs WHERE status='READY'"
).fetchone()["cnt"]

# Per tag breakdown
tag_rows = cur.execute("""
SELECT job_tag, COUNT(*) as cnt
FROM queue_jobs
WHERE status='PENDING' AND job_tag IS NOT NULL
GROUP BY job_tag
""").fetchall()

con.close()

return {
"pending_assignment": pending,
"ready_for_upload": ready,
"by_tag": {r["job_tag"]: r["cnt"] for r in tag_rows}
}
return {"printers": []}


def _normalize_tag(value: Optional[str]) -> str:
return (value or "").strip().lower()


def _normalize_tags(tags: Any) -> List[str]:
if isinstance(tags, str):
return [t.strip().lower() for t in tags.split(",") if t.strip()]
if isinstance(tags, (list, tuple, set)):
out = []
for item in tags:
text = str(item).strip()
if text:
out.append(text.lower())
return out
return []


def _available_printer_slots() -> List[Dict[str, Any]]:
"""Return printers that are eligible to take a new job right now."""
cfg = get_printers_config()
status_data = get_printer_status()
status_map = {}
for item in status_data if isinstance(status_data, list) else []:
if not isinstance(item, dict):
continue
device_id = item.get("device_id")
if not device_id:
continue
status_map[device_id] = str(item.get("status", "UNKNOWN")).upper()

con = db_conn()
cur = con.cursor()
rows = cur.execute(
"""
SELECT device_id, COUNT(*) as cnt
FROM queue_jobs
WHERE status IN ('READY','UPLOADING','PRINTING')
GROUP BY device_id
"""
).fetchall()
con.close()
queue_loads = {r["device_id"]: r["cnt"] for r in rows}

eligible_statuses = {"IDLE"}
if ASSIGN_TO_FINISH:
eligible_statuses.add("FINISH")

slots: List[Dict[str, Any]] = []
for printer in cfg:
if not printer.get("autoprint"):
continue
device_id = printer.get("device_id")
if not device_id:
continue

if queue_loads.get(device_id, 0) > 0:
continue

status_value = status_map.get(device_id)
if not status_value or status_value not in eligible_statuses:
continue

slots.append(
{
"device_id": device_id,
"name": printer.get("name") or device_id,
"tags": _normalize_tags(printer.get("tags", [])),
}
)

slots.sort(key=lambda item: item["name"].lower())
return slots


def _simulate_pending_assignment(pending_rows: List[sqlite3.Row], slots: List[Dict[str, Any]]) -> Tuple[int, Dict[str, int], int]:
"""
Simulate how many pending jobs can be assigned immediately given the
currently available printer slots.
Returns a tuple of (assignable_count, waiting_by_tag, waiting_without_tag).
"""

available = list(slots)
assignable = 0
waiting_by_tag: Dict[str, int] = {}
waiting_without_tag = 0

for row in pending_rows:
raw_tag = row["job_tag"]
normalized_tag = _normalize_tag(raw_tag)

candidate_index = None
for idx, slot in enumerate(available):
if normalized_tag and normalized_tag not in slot["tags"]:
continue
candidate_index = idx
break

if candidate_index is not None:
assignable += 1
available.pop(candidate_index)
continue

if raw_tag:
waiting_by_tag[raw_tag] = waiting_by_tag.get(raw_tag, 0) + 1
else:
waiting_without_tag += 1

return assignable, waiting_by_tag, waiting_without_tag


@app.get("/api/queue/waiting")
def api_queue_waiting():
"""Number of jobs waiting for an IDLE printer"""
con = db_conn()
cur = con.cursor()

pending_rows = cur.execute(
"""
SELECT id, job_tag
FROM queue_jobs
WHERE status='PENDING'
ORDER BY created_at ASC
"""
).fetchall()

# READY jobs (assigned but not uploaded yet)
ready = cur.execute(
"SELECT COUNT(*) as cnt FROM queue_jobs WHERE status='READY'"
).fetchone()["cnt"]

con.close()

slots = _available_printer_slots()
assignable, waiting_by_tag, waiting_without_tag = _simulate_pending_assignment(pending_rows, slots)
total_pending = len(pending_rows)
waiting_total = max(total_pending - assignable, 0)

return {
"pending_assignment": total_pending,
"ready_for_upload": ready,
"pending_assignable": assignable,
"pending_waiting": waiting_total,
"untagged_waiting": waiting_without_tag,
"by_tag": waiting_by_tag,
"available_printers": [slot["device_id"] for slot in slots],
}

# ============ Startup & Shutdown ============
@app.on_event("startup")
Expand Down
Loading