Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pyworkflow/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"""

import os
from datetime import timedelta
from typing import Any

from celery import Celery
Expand Down Expand Up @@ -343,8 +344,14 @@ def create_celery_app(
# Monitoring
"worker_send_task_events": True,
"task_send_sent_event": True,
# Beat scheduler (for sleep resumption)
"beat_schedule": {},
# Beat scheduler (for sleep resumption and periodic tasks)
"beat_schedule": {
"pyworkflow-data-retention": {
"task": "pyworkflow.run_data_retention",
"schedule": timedelta(hours=24),
"options": {"queue": "pyworkflow.default"},
},
},
# Logging
"worker_log_format": "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s",
"worker_task_log_format": "[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s",
Expand Down
42 changes: 42 additions & 0 deletions pyworkflow/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2749,3 +2749,45 @@ async def _handle_continue_as_new_celery(
)

return new_run_id


@celery_app.task(
name="pyworkflow.run_data_retention",
base=SingletonWorkflowTask,
bind=True,
queue="pyworkflow.default",
release_lock_on_failure=True,
lock_expiry=7200, # 2h safety net
)
def run_data_retention_task(self: SingletonWorkflowTask) -> dict[str, Any]:
"""
Periodic task: delete workflow runs older than data_retention_days.

Singleton (only one instance runs at a time). Skips if data_retention_days
is not configured.
"""
from datetime import timedelta

from pyworkflow.config import get_config
from pyworkflow.storage.config import storage_to_config

config = get_config()
if config.data_retention_days is None:
logger.debug("Data retention not configured; skipping.")
return {"deleted": 0, "skipped": True}

storage = config.storage
if storage is None:
logger.warning("No storage configured; skipping data retention.")
return {"deleted": 0, "skipped": True}

cutoff = datetime.now(UTC) - timedelta(days=config.data_retention_days)
logger.info(
"Running data retention: deleting runs updated before {}",
cutoff.isoformat(),
)
storage_config = storage_to_config(storage)
backend = _get_storage_backend(storage_config)
count = run_async(backend.delete_old_runs(cutoff))
logger.info("Data retention complete: deleted {} runs", count)
return {"deleted": count, "skipped": False}
19 changes: 19 additions & 0 deletions pyworkflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
PYWORKFLOW_CELERY_BROKER: Celery broker URL
PYWORKFLOW_CELERY_RESULT_BACKEND: Celery result backend URL
PYWORKFLOW_RUNTIME: Default runtime (local, celery)
PYWORKFLOW_DATA_RETENTION_DAYS: Days to retain completed/failed/cancelled runs (unset = keep forever)
"""

import os
Expand Down Expand Up @@ -161,6 +162,9 @@ class PyWorkflowConfig:
celery_broker: str | None = None
aws_region: str | None = None

# Data retention policy
data_retention_days: int | None = None # None = keep forever

# Event limit settings (WARNING: Do not modify unless you understand the implications)
# These limits prevent runaway workflows from consuming excessive resources
event_soft_limit: int = 10_000 # Log warning at this count
Expand Down Expand Up @@ -196,11 +200,21 @@ def _config_from_env_and_yaml() -> PyWorkflowConfig:
celery_config = yaml_config.get("celery", {})
celery_broker = os.getenv("PYWORKFLOW_CELERY_BROKER") or celery_config.get("broker")

# Data retention: env var > yaml > None (keep forever)
retention_env = os.getenv("PYWORKFLOW_DATA_RETENTION_DAYS")
if retention_env is not None:
data_retention_days: int | None = int(retention_env)
elif yaml_config.get("retention_days") is not None:
data_retention_days = int(yaml_config["retention_days"])
else:
data_retention_days = None

return PyWorkflowConfig(
default_runtime=runtime,
default_durable=durable,
storage=storage,
celery_broker=celery_broker,
data_retention_days=data_retention_days,
)


Expand Down Expand Up @@ -348,11 +362,16 @@ def configure_from_yaml(path: str | Path, discover: bool = True) -> None:
celery_config = yaml_config.get("celery", {})
celery_broker = celery_config.get("broker")

# Data retention
retention_raw = yaml_config.get("retention_days")
data_retention_days: int | None = int(retention_raw) if retention_raw is not None else None

_config = PyWorkflowConfig(
default_runtime=runtime,
default_durable=durable,
storage=storage,
celery_broker=celery_broker,
data_retention_days=data_retention_days,
)
_config_loaded_from_yaml = True

Expand Down
16 changes: 16 additions & 0 deletions pyworkflow/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,22 @@ async def disconnect(self) -> None:
"""
pass

@abstractmethod
async def delete_old_runs(self, older_than: datetime) -> int:
"""
Delete workflow runs in terminal states last updated before `older_than`.

Terminal states: COMPLETED, FAILED, CANCELLED, CONTINUED_AS_NEW, INTERRUPTED.
Associated events, steps, hooks, and cancellation flags are deleted too.

Args:
older_than: Delete runs where updated_at < older_than

Returns:
Number of workflow runs deleted
"""
pass

async def health_check(self) -> bool:
"""
Check if storage backend is healthy and accessible.
Expand Down
66 changes: 66 additions & 0 deletions pyworkflow/storage/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -1709,6 +1709,72 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None:
schedule.updated_at = datetime.now(UTC)
await self.update_schedule(schedule)

async def delete_old_runs(self, older_than: datetime) -> int:
"""Delete terminal runs (and all related data) updated before older_than."""
session = self._ensure_connected()
terminal = ["completed", "failed", "cancelled", "continued_as_new", "interrupted"]
count = 0

for status_val in terminal:
rows = session.execute(
SimpleStatement(
"SELECT run_id FROM runs_by_status WHERE status = %s",
consistency_level=self.read_consistency,
),
(status_val,),
)
for row in rows:
run_id = row.run_id
# Fetch run to check updated_at
run_row = session.execute(
SimpleStatement(
"SELECT updated_at FROM workflow_runs WHERE run_id = %s",
consistency_level=self.read_consistency,
),
(run_id,),
).one()
if run_row is None or run_row.updated_at >= older_than:
continue
# Delete all related data
session.execute(
SimpleStatement(
"DELETE FROM events WHERE run_id = %s",
consistency_level=self.write_consistency,
),
(run_id,),
)
session.execute(
SimpleStatement(
"DELETE FROM steps WHERE run_id = %s",
consistency_level=self.write_consistency,
),
(run_id,),
)
session.execute(
SimpleStatement(
"DELETE FROM hooks WHERE run_id = %s",
consistency_level=self.write_consistency,
),
(run_id,),
)
session.execute(
SimpleStatement(
"DELETE FROM cancellation_flags WHERE run_id = %s",
consistency_level=self.write_consistency,
),
(run_id,),
)
session.execute(
SimpleStatement(
"DELETE FROM workflow_runs WHERE run_id = %s",
consistency_level=self.write_consistency,
),
(run_id,),
)
count += 1

return count

# Helper methods for converting Cassandra rows to domain objects

def _row_to_workflow_run(self, row: Any) -> WorkflowRun:
Expand Down
70 changes: 70 additions & 0 deletions pyworkflow/storage/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,76 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None:
schedule.updated_at = datetime.now(UTC)
await self.update_schedule(schedule)

async def delete_old_runs(self, older_than: datetime) -> int:
"""Delete terminal runs (and all related items) updated before older_than."""
terminal = ["completed", "failed", "cancelled", "continued_as_new", "interrupted"]
cutoff_iso = older_than.isoformat()
count = 0

async with self._get_client() as client:
# Scan for matching runs using GSI1 (status-based)
run_ids: list[str] = []
for status_val in terminal:
params: dict[str, Any] = {
"TableName": self.table_name,
"IndexName": "GSI1",
"KeyConditionExpression": "GSI1PK = :pk AND begins_with(GSI1SK, :status)",
"FilterExpression": "updated_at < :cutoff",
"ExpressionAttributeValues": {
":pk": {"S": "RUNS"},
":status": {"S": f"{status_val}#"},
":cutoff": {"S": cutoff_iso},
},
"ProjectionExpression": "run_id",
}
while True:
response = await client.query(**params)
for item in response.get("Items", []):
run_id_val = self._item_to_dict(item).get("run_id")
if run_id_val:
run_ids.append(run_id_val)
last_key = response.get("LastEvaluatedKey")
if not last_key:
break
params["ExclusiveStartKey"] = last_key

# For each run, delete all items with PK=RUN#{run_id} + cancellation flag
for run_id in run_ids:
# Query all items for this run
pk_val = f"RUN#{run_id}"
all_items: list[dict[str, Any]] = []
query_params: dict[str, Any] = {
"TableName": self.table_name,
"KeyConditionExpression": "PK = :pk",
"ExpressionAttributeValues": {":pk": {"S": pk_val}},
"ProjectionExpression": "PK, SK",
}
while True:
resp = await client.query(**query_params)
all_items.extend(resp.get("Items", []))
lk = resp.get("LastEvaluatedKey")
if not lk:
break
query_params["ExclusiveStartKey"] = lk

# Also add the cancellation flag item if it exists
all_items.append({"PK": {"S": f"CANCEL#{run_id}"}, "SK": {"S": "#FLAG"}})

# Batch delete in groups of 25 (DynamoDB limit)
for i in range(0, len(all_items), 25):
batch = all_items[i : i + 25]
request_items = {
self.table_name: [
{"DeleteRequest": {"Key": {"PK": item["PK"], "SK": item["SK"]}}}
for item in batch
]
}
await client.batch_write_item(RequestItems=request_items)

count += 1

return count

# Helper methods for converting DynamoDB items to domain objects

def _item_to_workflow_run(self, item: dict[str, Any]) -> WorkflowRun:
Expand Down
37 changes: 37 additions & 0 deletions pyworkflow/storage/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,3 +959,40 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None:
schedule.running_run_ids.remove(run_id)
schedule.updated_at = datetime.now(UTC)
await self.update_schedule(schedule)

async def delete_old_runs(self, older_than: datetime) -> int:
"""Delete terminal runs (and all related files) updated before older_than."""
terminal = {"completed", "failed", "cancelled", "continued_as_new", "interrupted"}

def _sync_delete() -> int:
count = 0
for run_file in list(self.runs_dir.glob("*.json")):
try:
data = json.loads(run_file.read_text())
if data.get("status") not in terminal:
continue
updated_at = datetime.fromisoformat(data["updated_at"])
if updated_at >= older_than:
continue
run_id = data["run_id"]
# Delete run file
run_file.unlink(missing_ok=True)
# Delete events file
(self.events_dir / f"{run_id}.jsonl").unlink(missing_ok=True)
# Delete step files belonging to this run
for sf in list(self.steps_dir.glob("*.json")):
try:
sd = json.loads(sf.read_text())
if sd.get("run_id") == run_id:
sf.unlink(missing_ok=True)
except Exception:
pass
# Delete hook files for this run
for hf in list(self.hooks_dir.glob(f"{run_id}__*.json")):
hf.unlink(missing_ok=True)
count += 1
except Exception:
continue
return count

return await asyncio.to_thread(_sync_delete)
31 changes: 31 additions & 0 deletions pyworkflow/storage/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,37 @@ async def remove_running_run(self, schedule_id: str, run_id: str) -> None:
schedule.running_run_ids.remove(run_id)
schedule.updated_at = datetime.now(UTC)

async def delete_old_runs(self, older_than: datetime) -> int:
"""Delete terminal runs updated before older_than."""
terminal = {
RunStatus.COMPLETED,
RunStatus.FAILED,
RunStatus.CANCELLED,
RunStatus.CONTINUED_AS_NEW,
RunStatus.INTERRUPTED,
}
with self._lock:
to_delete = [
run_id
for run_id, run in self._runs.items()
if run.status in terminal and run.updated_at < older_than
]
for run_id in to_delete:
run = self._runs.pop(run_id)
self._events.pop(run_id, None)
self._event_sequences.pop(run_id, None)
if run.idempotency_key:
self._idempotency_index.pop(run.idempotency_key, None)
step_ids = [sid for sid, s in self._steps.items() if s.run_id == run_id]
for sid in step_ids:
del self._steps[sid]
hook_keys = [k for k in self._hooks if k[0] == run_id]
for k in hook_keys:
hook = self._hooks.pop(k)
self._token_index.pop(hook.token, None)
self._cancellation_flags.pop(run_id, None)
return len(to_delete)

# Utility methods

def clear(self) -> None:
Expand Down
Loading
Loading