diff --git a/pyworkflow/celery/app.py b/pyworkflow/celery/app.py index 36ab1b6..1b58a5d 100644 --- a/pyworkflow/celery/app.py +++ b/pyworkflow/celery/app.py @@ -13,6 +13,7 @@ """ import os +from datetime import timedelta from typing import Any from celery import Celery @@ -343,8 +344,14 @@ def create_celery_app( # Monitoring "worker_send_task_events": True, "task_send_sent_event": True, - # Beat scheduler (for sleep resumption) - "beat_schedule": {}, + # Beat scheduler (for sleep resumption and periodic tasks) + "beat_schedule": { + "pyworkflow-data-retention": { + "task": "pyworkflow.run_data_retention", + "schedule": timedelta(hours=24), + "options": {"queue": "pyworkflow.default"}, + }, + }, # Logging "worker_log_format": "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s", "worker_task_log_format": "[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s", diff --git a/pyworkflow/celery/tasks.py b/pyworkflow/celery/tasks.py index 5282d3f..6103bbb 100644 --- a/pyworkflow/celery/tasks.py +++ b/pyworkflow/celery/tasks.py @@ -2749,3 +2749,45 @@ async def _handle_continue_as_new_celery( ) return new_run_id + + +@celery_app.task( + name="pyworkflow.run_data_retention", + base=SingletonWorkflowTask, + bind=True, + queue="pyworkflow.default", + release_lock_on_failure=True, + lock_expiry=7200, # 2h safety net +) +def run_data_retention_task(self: SingletonWorkflowTask) -> dict[str, Any]: + """ + Periodic task: delete workflow runs older than data_retention_days. + + Singleton (only one instance runs at a time). Skips if data_retention_days + is not configured. + """ + from datetime import timedelta + + from pyworkflow.config import get_config + from pyworkflow.storage.config import storage_to_config + + config = get_config() + if config.data_retention_days is None: + logger.debug("Data retention not configured; skipping.") + return {"deleted": 0, "skipped": True} + + storage = config.storage + if storage is None: + logger.warning("No storage configured; skipping data retention.") + return {"deleted": 0, "skipped": True} + + cutoff = datetime.now(UTC) - timedelta(days=config.data_retention_days) + logger.info( + "Running data retention: deleting runs updated before {}", + cutoff.isoformat(), + ) + storage_config = storage_to_config(storage) + backend = _get_storage_backend(storage_config) + count = run_async(backend.delete_old_runs(cutoff)) + logger.info("Data retention complete: deleted {} runs", count) + return {"deleted": count, "skipped": False} diff --git a/pyworkflow/config.py b/pyworkflow/config.py index 3b06368..9439c34 100644 --- a/pyworkflow/config.py +++ b/pyworkflow/config.py @@ -33,6 +33,7 @@ PYWORKFLOW_CELERY_BROKER: Celery broker URL PYWORKFLOW_CELERY_RESULT_BACKEND: Celery result backend URL PYWORKFLOW_RUNTIME: Default runtime (local, celery) + PYWORKFLOW_DATA_RETENTION_DAYS: Days to retain completed/failed/cancelled runs (unset = keep forever) """ import os @@ -161,6 +162,9 @@ class PyWorkflowConfig: celery_broker: str | None = None aws_region: str | None = None + # Data retention policy + data_retention_days: int | None = None # None = keep forever + # Event limit settings (WARNING: Do not modify unless you understand the implications) # These limits prevent runaway workflows from consuming excessive resources event_soft_limit: int = 10_000 # Log warning at this count @@ -196,11 +200,21 @@ def _config_from_env_and_yaml() -> PyWorkflowConfig: celery_config = yaml_config.get("celery", {}) celery_broker = os.getenv("PYWORKFLOW_CELERY_BROKER") or celery_config.get("broker") + # Data retention: env var > yaml > None (keep forever) + retention_env = os.getenv("PYWORKFLOW_DATA_RETENTION_DAYS") + if retention_env is not None: + data_retention_days: int | None = int(retention_env) + elif yaml_config.get("retention_days") is not None: + data_retention_days = int(yaml_config["retention_days"]) + else: + data_retention_days = None + return PyWorkflowConfig( default_runtime=runtime, default_durable=durable, storage=storage, celery_broker=celery_broker, + data_retention_days=data_retention_days, ) @@ -348,11 +362,16 @@ def configure_from_yaml(path: str | Path, discover: bool = True) -> None: celery_config = yaml_config.get("celery", {}) celery_broker = celery_config.get("broker") + # Data retention + retention_raw = yaml_config.get("retention_days") + data_retention_days: int | None = int(retention_raw) if retention_raw is not None else None + _config = PyWorkflowConfig( default_runtime=runtime, default_durable=durable, storage=storage, celery_broker=celery_broker, + data_retention_days=data_retention_days, ) _config_loaded_from_yaml = True diff --git a/pyworkflow/storage/base.py b/pyworkflow/storage/base.py index 8c25c62..547479a 100644 --- a/pyworkflow/storage/base.py +++ b/pyworkflow/storage/base.py @@ -700,6 +700,22 @@ async def disconnect(self) -> None: """ pass + @abstractmethod + async def delete_old_runs(self, older_than: datetime) -> int: + """ + Delete workflow runs in terminal states last updated before `older_than`. + + Terminal states: COMPLETED, FAILED, CANCELLED, CONTINUED_AS_NEW, INTERRUPTED. + Associated events, steps, hooks, and cancellation flags are deleted too. + + Args: + older_than: Delete runs where updated_at < older_than + + Returns: + Number of workflow runs deleted + """ + pass + async def health_check(self) -> bool: """ Check if storage backend is healthy and accessible. diff --git a/pyworkflow/storage/cassandra.py b/pyworkflow/storage/cassandra.py index 53eeb61..037cf08 100644 --- a/pyworkflow/storage/cassandra.py +++ b/pyworkflow/storage/cassandra.py @@ -1709,6 +1709,72 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None: schedule.updated_at = datetime.now(UTC) await self.update_schedule(schedule) + async def delete_old_runs(self, older_than: datetime) -> int: + """Delete terminal runs (and all related data) updated before older_than.""" + session = self._ensure_connected() + terminal = ["completed", "failed", "cancelled", "continued_as_new", "interrupted"] + count = 0 + + for status_val in terminal: + rows = session.execute( + SimpleStatement( + "SELECT run_id FROM runs_by_status WHERE status = %s", + consistency_level=self.read_consistency, + ), + (status_val,), + ) + for row in rows: + run_id = row.run_id + # Fetch run to check updated_at + run_row = session.execute( + SimpleStatement( + "SELECT updated_at FROM workflow_runs WHERE run_id = %s", + consistency_level=self.read_consistency, + ), + (run_id,), + ).one() + if run_row is None or run_row.updated_at >= older_than: + continue + # Delete all related data + session.execute( + SimpleStatement( + "DELETE FROM events WHERE run_id = %s", + consistency_level=self.write_consistency, + ), + (run_id,), + ) + session.execute( + SimpleStatement( + "DELETE FROM steps WHERE run_id = %s", + consistency_level=self.write_consistency, + ), + (run_id,), + ) + session.execute( + SimpleStatement( + "DELETE FROM hooks WHERE run_id = %s", + consistency_level=self.write_consistency, + ), + (run_id,), + ) + session.execute( + SimpleStatement( + "DELETE FROM cancellation_flags WHERE run_id = %s", + consistency_level=self.write_consistency, + ), + (run_id,), + ) + session.execute( + SimpleStatement( + "DELETE FROM workflow_runs WHERE run_id = %s", + consistency_level=self.write_consistency, + ), + (run_id,), + ) + count += 1 + + return count + # Helper methods for converting Cassandra rows to domain objects def _row_to_workflow_run(self, row: Any) -> WorkflowRun: diff --git a/pyworkflow/storage/dynamodb.py b/pyworkflow/storage/dynamodb.py index 7e4a66b..e7539c3 100644 --- a/pyworkflow/storage/dynamodb.py +++ b/pyworkflow/storage/dynamodb.py @@ -1308,6 +1308,76 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None: schedule.updated_at = datetime.now(UTC) await self.update_schedule(schedule) + async def delete_old_runs(self, older_than: datetime) -> int: + """Delete terminal runs (and all related items) updated before older_than.""" + terminal = ["completed", "failed", "cancelled", "continued_as_new", "interrupted"] + cutoff_iso = older_than.isoformat() + count = 0 + + async with self._get_client() as client: + # Scan for matching runs using GSI1 (status-based) + run_ids: list[str] = [] + for status_val in terminal: + params: dict[str, Any] = { + "TableName": self.table_name, + "IndexName": "GSI1", + "KeyConditionExpression": "GSI1PK = :pk AND begins_with(GSI1SK, :status)", + "FilterExpression": "updated_at < :cutoff", + "ExpressionAttributeValues": { + ":pk": {"S": "RUNS"}, + ":status": {"S": f"{status_val}#"}, + ":cutoff": {"S": cutoff_iso}, + }, + "ProjectionExpression": "run_id", + } + while True: + response = await client.query(**params) + for item in response.get("Items", []): + run_id_val = self._item_to_dict(item).get("run_id") + if run_id_val: + run_ids.append(run_id_val) + last_key = response.get("LastEvaluatedKey") + if not last_key: + break + params["ExclusiveStartKey"] = last_key + + # For each run, delete all items with PK=RUN#{run_id} + cancellation flag + for run_id in run_ids: + # Query all items for this run + pk_val = f"RUN#{run_id}" + all_items: list[dict[str, Any]] = [] + query_params: dict[str, Any] = { + "TableName": self.table_name, + "KeyConditionExpression": "PK = :pk", + "ExpressionAttributeValues": {":pk": {"S": pk_val}}, + "ProjectionExpression": "PK, SK", + } + while True: + resp = await client.query(**query_params) + all_items.extend(resp.get("Items", [])) + lk = resp.get("LastEvaluatedKey") + if not lk: + break + query_params["ExclusiveStartKey"] = lk + + # Also add the cancellation flag item if it exists + all_items.append({"PK": {"S": f"CANCEL#{run_id}"}, "SK": {"S": "#FLAG"}}) + + # Batch delete in groups of 25 (DynamoDB limit) + for i in range(0, len(all_items), 25): + batch = all_items[i : i + 25] + request_items = { + self.table_name: [ + {"DeleteRequest": {"Key": {"PK": item["PK"], "SK": item["SK"]}}} + for item in batch + ] + } + await client.batch_write_item(RequestItems=request_items) + + count += 1 + + return count + # Helper methods for converting DynamoDB items to domain objects def _item_to_workflow_run(self, item: dict[str, Any]) -> WorkflowRun: diff --git a/pyworkflow/storage/file.py b/pyworkflow/storage/file.py index c179179..72c2682 100644 --- a/pyworkflow/storage/file.py +++ b/pyworkflow/storage/file.py @@ -959,3 +959,40 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None: schedule.running_run_ids.remove(run_id) schedule.updated_at = datetime.now(UTC) await self.update_schedule(schedule) + + async def delete_old_runs(self, older_than: datetime) -> int: + """Delete terminal runs (and all related files) updated before older_than.""" + terminal = {"completed", "failed", "cancelled", "continued_as_new", "interrupted"} + + def _sync_delete() -> int: + count = 0 + for run_file in list(self.runs_dir.glob("*.json")): + try: + data = json.loads(run_file.read_text()) + if data.get("status") not in terminal: + continue + updated_at = datetime.fromisoformat(data["updated_at"]) + if updated_at >= older_than: + continue + run_id = data["run_id"] + # Delete run file + run_file.unlink(missing_ok=True) + # Delete events file + (self.events_dir / f"{run_id}.jsonl").unlink(missing_ok=True) + # Delete step files belonging to this run + for sf in list(self.steps_dir.glob("*.json")): + try: + sd = json.loads(sf.read_text()) + if sd.get("run_id") == run_id: + sf.unlink(missing_ok=True) + except Exception: + pass + # Delete hook files for this run + for hf in list(self.hooks_dir.glob(f"{run_id}__*.json")): + hf.unlink(missing_ok=True) + count += 1 + except Exception: + continue + return count + + return await asyncio.to_thread(_sync_delete) diff --git a/pyworkflow/storage/memory.py b/pyworkflow/storage/memory.py index a3c4619..615f974 100644 --- a/pyworkflow/storage/memory.py +++ b/pyworkflow/storage/memory.py @@ -598,6 +598,37 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None: schedule.running_run_ids.remove(run_id) schedule.updated_at = datetime.now(UTC) + async def delete_old_runs(self, older_than: datetime) -> int: + """Delete terminal runs updated before older_than.""" + terminal = { + RunStatus.COMPLETED, + RunStatus.FAILED, + RunStatus.CANCELLED, + RunStatus.CONTINUED_AS_NEW, + RunStatus.INTERRUPTED, + } + with self._lock: + to_delete = [ + run_id + for run_id, run in self._runs.items() + if run.status in terminal and run.updated_at < older_than + ] + for run_id in to_delete: + run = self._runs.pop(run_id) + self._events.pop(run_id, None) + self._event_sequences.pop(run_id, None) + if run.idempotency_key: + self._idempotency_index.pop(run.idempotency_key, None) + step_ids = [sid for sid, s in self._steps.items() if s.run_id == run_id] + for sid in step_ids: + del self._steps[sid] + hook_keys = [k for k in self._hooks if k[0] == run_id] + for k in hook_keys: + hook = self._hooks.pop(k) + self._token_index.pop(hook.token, None) + self._cancellation_flags.pop(run_id, None) + return len(to_delete) + # Utility methods def clear(self) -> None: diff --git a/pyworkflow/storage/mysql.py b/pyworkflow/storage/mysql.py index 1d4182d..9c94860 100644 --- a/pyworkflow/storage/mysql.py +++ b/pyworkflow/storage/mysql.py @@ -1284,6 +1284,31 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None: schedule.updated_at = datetime.now(UTC) await self.update_schedule(schedule) + async def delete_old_runs(self, older_than: datetime) -> int: + """Delete terminal runs (and all related data) updated before older_than.""" + pool = self._ensure_connected() + terminal = ("completed", "failed", "cancelled", "continued_as_new", "interrupted") + placeholders = ",".join("%s" * len(terminal)) + subq = ( + f"SELECT run_id FROM workflow_runs WHERE status IN ({placeholders}) AND updated_at < %s" + ) + params = (*terminal, older_than) + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(f"DELETE FROM events WHERE run_id IN ({subq})", params) + await cur.execute(f"DELETE FROM steps WHERE run_id IN ({subq})", params) + await cur.execute(f"DELETE FROM hooks WHERE run_id IN ({subq})", params) + await cur.execute( + f"DELETE FROM cancellation_flags WHERE run_id IN ({subq})", params + ) + await cur.execute( + f"DELETE FROM workflow_runs WHERE status IN ({placeholders}) AND updated_at < %s", + params, + ) + count = cur.rowcount + await conn.commit() + return count if count is not None else 0 + # Helper methods for converting database rows to domain objects def _row_to_workflow_run(self, row: dict) -> WorkflowRun: diff --git a/pyworkflow/storage/postgres.py b/pyworkflow/storage/postgres.py index e81252f..126ffdb 100644 --- a/pyworkflow/storage/postgres.py +++ b/pyworkflow/storage/postgres.py @@ -1415,6 +1415,28 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None: schedule.updated_at = datetime.now(UTC) await self.update_schedule(schedule) + async def delete_old_runs(self, older_than: datetime) -> int: + """Delete terminal runs (and all related data) updated before older_than.""" + terminal = ["completed", "failed", "cancelled", "continued_as_new", "interrupted"] + pool = await self._get_pool() + async with pool.acquire() as conn, conn.transaction(): + subq = "SELECT run_id FROM workflow_runs WHERE status = ANY($1) AND updated_at < $2" + await conn.execute(f"DELETE FROM events WHERE run_id IN ({subq})", terminal, older_than) + await conn.execute(f"DELETE FROM steps WHERE run_id IN ({subq})", terminal, older_than) + await conn.execute(f"DELETE FROM hooks WHERE run_id IN ({subq})", terminal, older_than) + await conn.execute( + f"DELETE FROM cancellation_flags WHERE run_id IN ({subq})", + terminal, + older_than, + ) + result = await conn.execute( + "DELETE FROM workflow_runs WHERE status = ANY($1) AND updated_at < $2", + terminal, + older_than, + ) + # asyncpg returns e.g. "DELETE 42" + return int(result.split()[-1]) + # Helper methods for converting database rows to domain objects def _row_to_workflow_run(self, row: asyncpg.Record) -> WorkflowRun: diff --git a/pyworkflow/storage/sqlite.py b/pyworkflow/storage/sqlite.py index 874d4c4..e9a76a3 100644 --- a/pyworkflow/storage/sqlite.py +++ b/pyworkflow/storage/sqlite.py @@ -1253,6 +1253,28 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None: schedule.updated_at = datetime.now(UTC) await self.update_schedule(schedule) + async def delete_old_runs(self, older_than: datetime) -> int: + """Delete terminal runs (and all related data) updated before older_than.""" + db = self._ensure_connected() + terminal = ("completed", "failed", "cancelled", "continued_as_new", "interrupted") + placeholders = ",".join("?" * len(terminal)) + subq = ( + f"SELECT run_id FROM workflow_runs WHERE status IN ({placeholders}) AND updated_at < ?" + ) + params = (*terminal, older_than.isoformat()) + async with db.cursor() as cur: + await cur.execute(f"DELETE FROM events WHERE run_id IN ({subq})", params) + await cur.execute(f"DELETE FROM steps WHERE run_id IN ({subq})", params) + await cur.execute(f"DELETE FROM hooks WHERE run_id IN ({subq})", params) + await cur.execute(f"DELETE FROM cancellation_flags WHERE run_id IN ({subq})", params) + await cur.execute( + f"DELETE FROM workflow_runs WHERE status IN ({placeholders}) AND updated_at < ?", + params, + ) + count = cur.rowcount + await db.commit() + return count if count is not None else 0 + # Helper methods for converting database rows to domain objects def _row_to_workflow_run(self, row: Any) -> WorkflowRun: diff --git a/tests/unit/test_retention.py b/tests/unit/test_retention.py new file mode 100644 index 0000000..3b2d8b3 --- /dev/null +++ b/tests/unit/test_retention.py @@ -0,0 +1,245 @@ +""" +Unit tests for the data retention feature. + +Tests cover: +- InMemoryStorageBackend.delete_old_runs() correctness +- Config loading (env var + YAML) for data_retention_days +""" + +import os +from datetime import UTC, datetime, timedelta + +import pytest + +from pyworkflow.storage.memory import InMemoryStorageBackend +from pyworkflow.storage.schemas import RunStatus, WorkflowRun + + +def _make_run( + run_id: str, + status: RunStatus, + updated_at: datetime, +) -> WorkflowRun: + return WorkflowRun( + run_id=run_id, + workflow_name="test_workflow", + status=status, + created_at=updated_at, + updated_at=updated_at, + ) + + +class TestDeleteOldRuns: + """Tests for InMemoryStorageBackend.delete_old_runs().""" + + @pytest.fixture() + def storage(self): + return InMemoryStorageBackend() + + @pytest.mark.asyncio + async def test_deletes_old_terminal_runs(self, storage): + """Runs in terminal states older than cutoff are deleted.""" + old_time = datetime.now(UTC) - timedelta(days=10) + cutoff = datetime.now(UTC) - timedelta(days=5) + + run = _make_run("run-old-completed", RunStatus.COMPLETED, old_time) + await storage.create_run(run) + + count = await storage.delete_old_runs(cutoff) + + assert count == 1 + assert await storage.get_run("run-old-completed") is None + + @pytest.mark.asyncio + async def test_does_not_delete_recent_terminal_runs(self, storage): + """Terminal runs newer than cutoff are kept.""" + recent_time = datetime.now(UTC) - timedelta(days=1) + cutoff = datetime.now(UTC) - timedelta(days=5) + + run = _make_run("run-recent", RunStatus.COMPLETED, recent_time) + await storage.create_run(run) + + count = await storage.delete_old_runs(cutoff) + + assert count == 0 + assert await storage.get_run("run-recent") is not None + + @pytest.mark.asyncio + async def test_does_not_delete_active_runs(self, storage): + """Active/non-terminal runs are never deleted even if old.""" + old_time = datetime.now(UTC) - timedelta(days=30) + cutoff = datetime.now(UTC) - timedelta(days=5) + + for status in (RunStatus.RUNNING, RunStatus.PENDING, RunStatus.SUSPENDED): + run = _make_run(f"run-{status.value}", status, old_time) + await storage.create_run(run) + + count = await storage.delete_old_runs(cutoff) + + assert count == 0 + for status in (RunStatus.RUNNING, RunStatus.PENDING, RunStatus.SUSPENDED): + assert await storage.get_run(f"run-{status.value}") is not None + + @pytest.mark.asyncio + async def test_deletes_all_terminal_statuses(self, storage): + """All terminal statuses are eligible for deletion.""" + old_time = datetime.now(UTC) - timedelta(days=10) + cutoff = datetime.now(UTC) - timedelta(days=5) + + terminal_statuses = [ + RunStatus.COMPLETED, + RunStatus.FAILED, + RunStatus.CANCELLED, + RunStatus.CONTINUED_AS_NEW, + RunStatus.INTERRUPTED, + ] + for status in terminal_statuses: + run = _make_run(f"run-{status.value}", status, old_time) + await storage.create_run(run) + + count = await storage.delete_old_runs(cutoff) + + assert count == len(terminal_statuses) + for status in terminal_statuses: + assert await storage.get_run(f"run-{status.value}") is None + + @pytest.mark.asyncio + async def test_returns_zero_when_nothing_to_delete(self, storage): + """Returns 0 when no runs match the criteria.""" + count = await storage.delete_old_runs(datetime.now(UTC)) + assert count == 0 + + @pytest.mark.asyncio + async def test_deletes_related_events(self, storage): + """Events belonging to deleted runs are also removed.""" + from pyworkflow.engine.events import Event, EventType + + old_time = datetime.now(UTC) - timedelta(days=10) + cutoff = datetime.now(UTC) - timedelta(days=5) + + run = _make_run("run-with-events", RunStatus.COMPLETED, old_time) + await storage.create_run(run) + + event = Event( + run_id="run-with-events", + type=EventType.WORKFLOW_STARTED, + data={}, + ) + await storage.record_event(event) + assert len(await storage.get_events("run-with-events")) == 1 + + await storage.delete_old_runs(cutoff) + + assert await storage.get_events("run-with-events") == [] + + @pytest.mark.asyncio + async def test_deletes_related_steps(self, storage): + """Steps belonging to deleted runs are also removed.""" + from datetime import UTC, datetime + + from pyworkflow.storage.schemas import StepExecution, StepStatus + + old_time = datetime.now(UTC) - timedelta(days=10) + cutoff = datetime.now(UTC) - timedelta(days=5) + + run = _make_run("run-with-steps", RunStatus.COMPLETED, old_time) + await storage.create_run(run) + + step = StepExecution( + step_id="step-1", + run_id="run-with-steps", + step_name="my_step", + status=StepStatus.COMPLETED, + created_at=old_time, + updated_at=old_time, + ) + await storage.create_step(step) + assert await storage.get_step("step-1") is not None + + await storage.delete_old_runs(cutoff) + + assert await storage.get_step("step-1") is None + + @pytest.mark.asyncio + async def test_does_not_affect_other_runs(self, storage): + """Deleting old runs does not affect unrelated runs.""" + old_time = datetime.now(UTC) - timedelta(days=10) + recent_time = datetime.now(UTC) - timedelta(days=1) + cutoff = datetime.now(UTC) - timedelta(days=5) + + old_run = _make_run("run-old", RunStatus.COMPLETED, old_time) + active_run = _make_run("run-active", RunStatus.RUNNING, old_time) + recent_run = _make_run("run-recent", RunStatus.COMPLETED, recent_time) + + for run in (old_run, active_run, recent_run): + await storage.create_run(run) + + count = await storage.delete_old_runs(cutoff) + + assert count == 1 + assert await storage.get_run("run-old") is None + assert await storage.get_run("run-active") is not None + assert await storage.get_run("run-recent") is not None + + @pytest.mark.asyncio + async def test_cancellation_flags_removed(self, storage): + """Cancellation flags for deleted runs are cleaned up.""" + old_time = datetime.now(UTC) - timedelta(days=10) + cutoff = datetime.now(UTC) - timedelta(days=5) + + run = _make_run("run-cancelled", RunStatus.CANCELLED, old_time) + await storage.create_run(run) + await storage.set_cancellation_flag("run-cancelled") + assert await storage.check_cancellation_flag("run-cancelled") is True + + await storage.delete_old_runs(cutoff) + + assert await storage.check_cancellation_flag("run-cancelled") is False + + +class TestRetentionConfig: + """Tests for data_retention_days configuration loading.""" + + def setup_method(self): + from pyworkflow.config import reset_config + + reset_config() + + def teardown_method(self): + from pyworkflow.config import reset_config + + reset_config() + # Clean env + os.environ.pop("PYWORKFLOW_DATA_RETENTION_DAYS", None) + + def test_default_is_none(self): + """data_retention_days defaults to None (keep forever).""" + from pyworkflow.config import get_config + + config = get_config() + assert config.data_retention_days is None + + def test_loaded_from_env_var(self): + """data_retention_days is loaded from PYWORKFLOW_DATA_RETENTION_DAYS env var.""" + os.environ["PYWORKFLOW_DATA_RETENTION_DAYS"] = "30" + + from pyworkflow.config import _config_from_env_and_yaml + + config = _config_from_env_and_yaml() + assert config.data_retention_days == 30 + + def test_configure_accepts_data_retention_days(self): + """configure() accepts data_retention_days kwarg.""" + import pyworkflow + + pyworkflow.configure(data_retention_days=90) + from pyworkflow.config import get_config + + assert get_config().data_retention_days == 90 + + def test_configure_rejects_unknown_key(self): + """configure() still rejects unknown keys.""" + import pyworkflow + + with pytest.raises(ValueError, match="Unknown config option"): + pyworkflow.configure(nonexistent_option=42)