Skip to content
194 changes: 183 additions & 11 deletions api/controllers/console/app/workflow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,90 @@
from controllers.console.wraps import account_initialization_required, setup_required
from fields.workflow_run_fields import (
advanced_chat_workflow_run_pagination_fields,
workflow_run_count_fields,
workflow_run_detail_fields,
workflow_run_node_execution_list_fields,
workflow_run_pagination_fields,
)
from libs.custom_inputs import time_duration
from libs.helper import uuid_value
from libs.login import current_user, login_required
from models import Account, App, AppMode, EndUser
from models import Account, App, AppMode, EndUser, WorkflowRunTriggeredFrom
from services.workflow_run_service import WorkflowRunService

# Workflow run status choices for filtering
WORKFLOW_RUN_STATUS_CHOICES = ["running", "succeeded", "failed", "stopped", "partial-succeeded"]


def _parse_workflow_run_list_args():
"""
Parse common arguments for workflow run list endpoints.

Returns:
Parsed arguments containing last_id, limit, status, and triggered_from filters
"""
parser = reqparse.RequestParser()
parser.add_argument("last_id", type=uuid_value, location="args")
parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args")
parser.add_argument(
"status",
type=str,
choices=WORKFLOW_RUN_STATUS_CHOICES,
location="args",
required=False,
)
parser.add_argument(
"triggered_from",
type=str,
choices=["debugging", "app-run"],
location="args",
required=False,
help="Filter by trigger source: debugging or app-run",
)
return parser.parse_args()


def _parse_workflow_run_count_args():
"""
Parse common arguments for workflow run count endpoints.

Returns:
Parsed arguments containing status, time_range, and triggered_from filters
"""
parser = reqparse.RequestParser()
parser.add_argument(
"status",
type=str,
choices=WORKFLOW_RUN_STATUS_CHOICES,
location="args",
required=False,
)
parser.add_argument(
"time_range",
type=time_duration,
location="args",
required=False,
help="Time range filter (e.g., 7d, 4h, 30m, 30s)",
)
parser.add_argument(
"triggered_from",
type=str,
choices=["debugging", "app-run"],
location="args",
required=False,
help="Filter by trigger source: debugging or app-run",
)
return parser.parse_args()


@console_ns.route("/apps/<uuid:app_id>/advanced-chat/workflow-runs")
class AdvancedChatAppWorkflowRunListApi(Resource):
@api.doc("get_advanced_chat_workflow_runs")
@api.doc(description="Get advanced chat workflow run list")
@api.doc(params={"app_id": "Application ID"})
@api.doc(params={"last_id": "Last run ID for pagination", "limit": "Number of items per page (1-100)"})
@api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"})
@api.doc(params={"triggered_from": "Filter by trigger source (optional): debugging or app-run. Default: debugging"})
@api.response(200, "Workflow runs retrieved successfully", advanced_chat_workflow_run_pagination_fields)
@setup_required
@login_required
Expand All @@ -34,13 +102,64 @@ def get(self, app_model: App):
"""
Get advanced chat app workflow run list
"""
parser = reqparse.RequestParser()
parser.add_argument("last_id", type=uuid_value, location="args")
parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args")
args = parser.parse_args()
args = _parse_workflow_run_list_args()

# Default to DEBUGGING if not specified
triggered_from = (
WorkflowRunTriggeredFrom(args.get("triggered_from"))
if args.get("triggered_from")
else WorkflowRunTriggeredFrom.DEBUGGING
)

workflow_run_service = WorkflowRunService()
result = workflow_run_service.get_paginate_advanced_chat_workflow_runs(app_model=app_model, args=args)
result = workflow_run_service.get_paginate_advanced_chat_workflow_runs(
app_model=app_model, args=args, triggered_from=triggered_from
)

return result


@console_ns.route("/apps/<uuid:app_id>/advanced-chat/workflow-runs/count")
class AdvancedChatAppWorkflowRunCountApi(Resource):
@api.doc("get_advanced_chat_workflow_runs_count")
@api.doc(description="Get advanced chat workflow runs count statistics")
@api.doc(params={"app_id": "Application ID"})
@api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"})
@api.doc(
params={
"time_range": (
"Filter by time range (optional): e.g., 7d (7 days), 4h (4 hours), "
"30m (30 minutes), 30s (30 seconds). Filters by created_at field."
)
}
)
@api.doc(params={"triggered_from": "Filter by trigger source (optional): debugging or app-run. Default: debugging"})
@api.response(200, "Workflow runs count retrieved successfully", workflow_run_count_fields)
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@marshal_with(workflow_run_count_fields)
def get(self, app_model: App):
"""
Get advanced chat workflow runs count statistics
"""
args = _parse_workflow_run_count_args()

# Default to DEBUGGING if not specified
triggered_from = (
WorkflowRunTriggeredFrom(args.get("triggered_from"))
if args.get("triggered_from")
else WorkflowRunTriggeredFrom.DEBUGGING
)

workflow_run_service = WorkflowRunService()
result = workflow_run_service.get_workflow_runs_count(
app_model=app_model,
status=args.get("status"),
time_range=args.get("time_range"),
triggered_from=triggered_from,
)

return result

Expand All @@ -51,6 +170,8 @@ class WorkflowRunListApi(Resource):
@api.doc(description="Get workflow run list")
@api.doc(params={"app_id": "Application ID"})
@api.doc(params={"last_id": "Last run ID for pagination", "limit": "Number of items per page (1-100)"})
@api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"})
@api.doc(params={"triggered_from": "Filter by trigger source (optional): debugging or app-run. Default: debugging"})
@api.response(200, "Workflow runs retrieved successfully", workflow_run_pagination_fields)
@setup_required
@login_required
Expand All @@ -61,13 +182,64 @@ def get(self, app_model: App):
"""
Get workflow run list
"""
parser = reqparse.RequestParser()
parser.add_argument("last_id", type=uuid_value, location="args")
parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args")
args = parser.parse_args()
args = _parse_workflow_run_list_args()

# Default to DEBUGGING for workflow if not specified (backward compatibility)
triggered_from = (
WorkflowRunTriggeredFrom(args.get("triggered_from"))
if args.get("triggered_from")
else WorkflowRunTriggeredFrom.DEBUGGING
)

workflow_run_service = WorkflowRunService()
result = workflow_run_service.get_paginate_workflow_runs(
app_model=app_model, args=args, triggered_from=triggered_from
)

return result


@console_ns.route("/apps/<uuid:app_id>/workflow-runs/count")
class WorkflowRunCountApi(Resource):
@api.doc("get_workflow_runs_count")
@api.doc(description="Get workflow runs count statistics")
@api.doc(params={"app_id": "Application ID"})
@api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"})
@api.doc(
params={
"time_range": (
"Filter by time range (optional): e.g., 7d (7 days), 4h (4 hours), "
"30m (30 minutes), 30s (30 seconds). Filters by created_at field."
)
}
)
@api.doc(params={"triggered_from": "Filter by trigger source (optional): debugging or app-run. Default: debugging"})
@api.response(200, "Workflow runs count retrieved successfully", workflow_run_count_fields)
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_count_fields)
def get(self, app_model: App):
"""
Get workflow runs count statistics
"""
args = _parse_workflow_run_count_args()

# Default to DEBUGGING for workflow if not specified (backward compatibility)
triggered_from = (
WorkflowRunTriggeredFrom(args.get("triggered_from"))
if args.get("triggered_from")
else WorkflowRunTriggeredFrom.DEBUGGING
)

workflow_run_service = WorkflowRunService()
result = workflow_run_service.get_paginate_workflow_runs(app_model=app_model, args=args)
result = workflow_run_service.get_workflow_runs_count(
app_model=app_model,
status=args.get("status"),
time_range=args.get("time_range"),
triggered_from=triggered_from,
)

return result

Expand Down
9 changes: 9 additions & 0 deletions api/fields/workflow_run_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ def build_workflow_run_for_log_model(api_or_ns: Api | Namespace):
"data": fields.List(fields.Nested(workflow_run_for_list_fields), attribute="data"),
}

workflow_run_count_fields = {
"total": fields.Integer,
"running": fields.Integer,
"succeeded": fields.Integer,
"failed": fields.Integer,
"stopped": fields.Integer,
"partial_succeeded": fields.Integer(attribute="partial-succeeded"),
}
Comment thread
twjackysu marked this conversation as resolved.

workflow_run_detail_fields = {
"id": fields.String,
"version": fields.String,
Expand Down
32 changes: 32 additions & 0 deletions api/libs/custom_inputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Custom input types for Flask-RESTX request parsing."""

import re


def time_duration(value: str) -> str:
"""
Validate and return time duration string.

Accepts formats: <number>d (days), <number>h (hours), <number>m (minutes), <number>s (seconds)
Examples: 7d, 4h, 30m, 30s

Args:
value: The time duration string

Returns:
The validated time duration string

Raises:
ValueError: If the format is invalid
"""
if not value:
raise ValueError("Time duration cannot be empty")

pattern = r"^(\d+)([dhms])$"
if not re.match(pattern, value.lower()):
raise ValueError(
"Invalid time duration format. Use: <number>d (days), <number>h (hours), "
"<number>m (minutes), or <number>s (seconds). Examples: 7d, 4h, 30m, 30s"
)

return value.lower()
67 changes: 67 additions & 0 deletions api/libs/time_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Time duration parser utility."""

import re
from datetime import UTC, datetime, timedelta


def parse_time_duration(duration_str: str) -> timedelta | None:
"""
Parse time duration string to timedelta.

Supported formats:
- 7d: 7 days
- 4h: 4 hours
- 30m: 30 minutes
- 30s: 30 seconds

Args:
duration_str: Duration string (e.g., "7d", "4h", "30m", "30s")

Returns:
timedelta object or None if invalid format
"""
if not duration_str:
return None

# Pattern: number followed by unit (d, h, m, s)
pattern = r"^(\d+)([dhms])$"
match = re.match(pattern, duration_str.lower())

if not match:
return None

value = int(match.group(1))
unit = match.group(2)

if unit == "d":
return timedelta(days=value)
elif unit == "h":
return timedelta(hours=value)
elif unit == "m":
return timedelta(minutes=value)
elif unit == "s":
return timedelta(seconds=value)

return None


def get_time_threshold(duration_str: str | None) -> datetime | None:
"""
Get datetime threshold from duration string.

Calculates the datetime that is duration_str ago from now.

Args:
duration_str: Duration string (e.g., "7d", "4h", "30m", "30s")

Returns:
datetime object representing the threshold time, or None if no duration
"""
if not duration_str:
return None

duration = parse_time_duration(duration_str)
if duration is None:
return None

return datetime.now(UTC) - duration
Loading