From ee7ba2fa9c3206fb76f3182222ce7c155c9c24ac Mon Sep 17 00:00:00 2001 From: Jacky Su Date: Mon, 13 Oct 2025 23:16:35 +0800 Subject: [PATCH 1/7] feat: add status filter for workflow run console API --- api/controllers/console/app/workflow_run.py | 4 ++++ api/repositories/api_workflow_run_repository.py | 2 ++ api/repositories/sqlalchemy_api_workflow_run_repository.py | 5 +++++ api/services/workflow_run_service.py | 2 ++ 4 files changed, 13 insertions(+) diff --git a/api/controllers/console/app/workflow_run.py b/api/controllers/console/app/workflow_run.py index 23ba63845c75c5..20b5a060f37b58 100644 --- a/api/controllers/console/app/workflow_run.py +++ b/api/controllers/console/app/workflow_run.py @@ -25,6 +25,7 @@ class AdvancedChatAppWorkflowRunListApi(Resource): @api.doc(description="Get advanced chat workflow run list") @api.doc(params={"app_id": "Application ID"}) @api.doc(params={"last_id": "Last run ID for pagination", "limit": "Number of items per page (1-100)"}) + @api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"}) @api.response(200, "Workflow runs retrieved successfully", advanced_chat_workflow_run_pagination_fields) @setup_required @login_required @@ -38,6 +39,7 @@ def get(self, app_model: App): parser = reqparse.RequestParser() parser.add_argument("last_id", type=uuid_value, location="args") parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") + parser.add_argument("status", type=str, location="args", required=False) args = parser.parse_args() workflow_run_service = WorkflowRunService() @@ -52,6 +54,7 @@ class WorkflowRunListApi(Resource): @api.doc(description="Get workflow run list") @api.doc(params={"app_id": "Application ID"}) @api.doc(params={"last_id": "Last run ID for pagination", "limit": "Number of items per page (1-100)"}) + @api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"}) @api.response(200, "Workflow runs retrieved successfully", workflow_run_pagination_fields) @setup_required @login_required @@ -65,6 +68,7 @@ def get(self, app_model: App): parser = reqparse.RequestParser() parser.add_argument("last_id", type=uuid_value, location="args") parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") + parser.add_argument("status", type=str, location="args", required=False) args = parser.parse_args() workflow_run_service = WorkflowRunService() diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 3ac28fad752f79..0826e0d4f53b34 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -59,6 +59,7 @@ def get_paginated_workflow_runs( triggered_from: str, limit: int = 20, last_id: str | None = None, + status: str | None = None, ) -> InfiniteScrollPagination: """ Get paginated workflow runs with filtering. @@ -73,6 +74,7 @@ def get_paginated_workflow_runs( triggered_from: Filter by trigger source (e.g., "debugging", "app-run") limit: Maximum number of records to return (default: 20) last_id: Cursor for pagination - ID of the last record from previous page + status: Optional filter by status (e.g., "running", "succeeded", "failed") Returns: InfiniteScrollPagination object containing: diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index 6154273f3380f5..afa17fc1b34858 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -63,6 +63,7 @@ def get_paginated_workflow_runs( triggered_from: str, limit: int = 20, last_id: str | None = None, + status: str | None = None, ) -> InfiniteScrollPagination: """ Get paginated workflow runs with filtering. @@ -79,6 +80,10 @@ def get_paginated_workflow_runs( WorkflowRun.triggered_from == triggered_from, ) + # Add optional status filter + if status: + base_stmt = base_stmt.where(WorkflowRun.status == status) + if last_id: # Get the last workflow run for cursor-based pagination last_run_stmt = base_stmt.where(WorkflowRun.id == last_id) diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 6a2edd912a0ba9..19ca1974b9d53a 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -70,6 +70,7 @@ def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScro """ limit = int(args.get("limit", 20)) last_id = args.get("last_id") + status = args.get("status") return self._workflow_run_repo.get_paginated_workflow_runs( tenant_id=app_model.tenant_id, @@ -77,6 +78,7 @@ def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScro triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, limit=limit, last_id=last_id, + status=status, ) def get_workflow_run(self, app_model: App, run_id: str) -> WorkflowRun | None: From a04ef1046121ba9176c6b8014c0293b8830b2f02 Mon Sep 17 00:00:00 2001 From: Jacky Su Date: Mon, 13 Oct 2025 23:23:02 +0800 Subject: [PATCH 2/7] feat: add workflow_run/count API --- api/controllers/console/app/workflow_run.py | 57 ++++++++++++++ api/fields/workflow_run_fields.py | 9 +++ .../api_workflow_run_repository.py | 32 ++++++++ .../sqlalchemy_api_workflow_run_repository.py | 74 ++++++++++++++++++- api/services/workflow_run_service.py | 30 ++++++++ 5 files changed, 201 insertions(+), 1 deletion(-) diff --git a/api/controllers/console/app/workflow_run.py b/api/controllers/console/app/workflow_run.py index 20b5a060f37b58..5a1963869fe653 100644 --- a/api/controllers/console/app/workflow_run.py +++ b/api/controllers/console/app/workflow_run.py @@ -9,6 +9,7 @@ from controllers.console.wraps import account_initialization_required, setup_required from fields.workflow_run_fields import ( advanced_chat_workflow_run_pagination_fields, + workflow_run_count_fields, workflow_run_detail_fields, workflow_run_node_execution_list_fields, workflow_run_pagination_fields, @@ -48,6 +49,34 @@ def get(self, app_model: App): return result +@console_ns.route("/apps//advanced-chat/workflow-runs/count") +class AdvancedChatAppWorkflowRunCountApi(Resource): + @api.doc("get_advanced_chat_workflow_runs_count") + @api.doc(description="Get advanced chat workflow runs count statistics") + @api.doc(params={"app_id": "Application ID"}) + @api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"}) + @api.response(200, "Workflow runs count retrieved successfully", workflow_run_count_fields) + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT]) + @marshal_with(workflow_run_count_fields) + def get(self, app_model: App): + """ + Get advanced chat workflow runs count statistics + """ + parser = reqparse.RequestParser() + parser.add_argument("status", type=str, location="args", required=False) + args = parser.parse_args() + + workflow_run_service = WorkflowRunService() + result = workflow_run_service.get_advanced_chat_workflow_runs_count( + app_model=app_model, status=args.get("status") + ) + + return result + + @console_ns.route("/apps//workflow-runs") class WorkflowRunListApi(Resource): @api.doc("get_workflow_runs") @@ -77,6 +106,34 @@ def get(self, app_model: App): return result +@console_ns.route("/apps//workflow-runs/count") +class WorkflowRunCountApi(Resource): + @api.doc("get_workflow_runs_count") + @api.doc(description="Get workflow runs count statistics") + @api.doc(params={"app_id": "Application ID"}) + @api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"}) + @api.response(200, "Workflow runs count retrieved successfully", workflow_run_count_fields) + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + @marshal_with(workflow_run_count_fields) + def get(self, app_model: App): + """ + Get workflow runs count statistics + """ + parser = reqparse.RequestParser() + parser.add_argument("status", type=str, location="args", required=False) + args = parser.parse_args() + + workflow_run_service = WorkflowRunService() + result = workflow_run_service.get_workflow_runs_count( + app_model=app_model, status=args.get("status") + ) + + return result + + @console_ns.route("/apps//workflow-runs/") class WorkflowRunDetailApi(Resource): @api.doc("get_workflow_run_detail") diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index 649e881848835e..bf749b067af53b 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -64,6 +64,15 @@ def build_workflow_run_for_log_model(api_or_ns: Api | Namespace): "data": fields.List(fields.Nested(workflow_run_for_list_fields), attribute="data"), } +workflow_run_count_fields = { + "total": fields.Integer, + "running": fields.Integer, + "succeeded": fields.Integer, + "failed": fields.Integer, + "stopped": fields.Integer, + "partial-succeeded": fields.Integer, +} + workflow_run_detail_fields = { "id": fields.String, "version": fields.String, diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 0826e0d4f53b34..1a0024b5400eaf 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -109,6 +109,38 @@ def get_workflow_run_by_id( """ ... + def get_workflow_runs_count( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + status: str | None = None, + ) -> dict[str, int]: + """ + Get workflow runs count statistics. + + Retrieves total count and count by status for workflow runs + matching the specified filters. + + Args: + tenant_id: Tenant identifier for multi-tenant isolation + app_id: Application identifier + triggered_from: Filter by trigger source (e.g., "debugging", "app-run") + status: Optional filter by specific status + + Returns: + Dictionary containing: + - total: Total count of all workflow runs (or filtered by status) + - running: Count of workflow runs with status "running" + - succeeded: Count of workflow runs with status "succeeded" + - failed: Count of workflow runs with status "failed" + - stopped: Count of workflow runs with status "stopped" + - partial_succeeded: Count of workflow runs with status "partial-succeeded" + + Note: If status is provided, only total will have value and other counts will be 0 + """ + ... + def get_expired_runs_batch( self, tenant_id: str, diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index afa17fc1b34858..fef0b31b4815d8 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -24,7 +24,7 @@ from datetime import datetime from typing import cast -from sqlalchemy import delete, select +from sqlalchemy import delete, func, select from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session, sessionmaker @@ -125,6 +125,78 @@ def get_workflow_run_by_id( ) return session.scalar(stmt) + def get_workflow_runs_count( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + status: str | None = None, + ) -> dict[str, int]: + """ + Get workflow runs count statistics grouped by status. + """ + with self._session_maker() as session: + # If status filter is provided, return simple count + if status: + count_stmt = select( + func.count(WorkflowRun.id) + ).where( + WorkflowRun.tenant_id == tenant_id, + WorkflowRun.app_id == app_id, + WorkflowRun.triggered_from == triggered_from, + WorkflowRun.status == status, + ) + total = session.scalar(count_stmt) or 0 + + # Initialize all status counts to 0 + result = { + "total": total, + "running": 0, + "succeeded": 0, + "failed": 0, + "stopped": 0, + "partial-succeeded": 0, + } + + # Set the count for the filtered status + if status in result: + result[status] = total + + return result + + # No status filter - get counts grouped by status + base_stmt = select( + WorkflowRun.status, + func.count(WorkflowRun.id).label("count") + ).where( + WorkflowRun.tenant_id == tenant_id, + WorkflowRun.app_id == app_id, + WorkflowRun.triggered_from == triggered_from, + ).group_by(WorkflowRun.status) + + # Execute query + results = session.execute(base_stmt).all() + + # Build response dictionary + status_counts = { + "running": 0, + "succeeded": 0, + "failed": 0, + "stopped": 0, + "partial-succeeded": 0, + } + + total = 0 + for status, count in results: + total += count + if status in status_counts: + status_counts[status] = count + + return { + "total": total, + **status_counts, + } + def get_expired_runs_batch( self, tenant_id: str, diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 19ca1974b9d53a..859c2d4b989f66 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -94,6 +94,36 @@ def get_workflow_run(self, app_model: App, run_id: str) -> WorkflowRun | None: run_id=run_id, ) + def get_workflow_runs_count(self, app_model: App, status: str | None = None) -> dict[str, int]: + """ + Get workflow runs count statistics + + :param app_model: app model + :param status: optional status filter + :return: dict with total and status counts + """ + return self._workflow_run_repo.get_workflow_runs_count( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + status=status, + ) + + def get_advanced_chat_workflow_runs_count(self, app_model: App, status: str | None = None) -> dict[str, int]: + """ + Get advanced chat workflow runs count statistics + + :param app_model: app model + :param status: optional status filter + :return: dict with total and status counts + """ + return self._workflow_run_repo.get_workflow_runs_count( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + status=status, + ) + def get_workflow_run_node_executions( self, app_model: App, From a8f01c28b73b587715c9fdbc5134725dc7b011bf Mon Sep 17 00:00:00 2001 From: Jacky Su Date: Tue, 14 Oct 2025 07:22:30 +0800 Subject: [PATCH 3/7] fix: fix gemini-code-assist suggestion --- api/controllers/console/app/workflow_run.py | 14 ++++----- api/fields/workflow_run_fields.py | 2 +- .../api_workflow_run_repository.py | 2 +- .../sqlalchemy_api_workflow_run_repository.py | 29 +++++++++---------- api/services/workflow_run_service.py | 15 ---------- 5 files changed, 21 insertions(+), 41 deletions(-) diff --git a/api/controllers/console/app/workflow_run.py b/api/controllers/console/app/workflow_run.py index 5a1963869fe653..886a460de09560 100644 --- a/api/controllers/console/app/workflow_run.py +++ b/api/controllers/console/app/workflow_run.py @@ -40,7 +40,7 @@ def get(self, app_model: App): parser = reqparse.RequestParser() parser.add_argument("last_id", type=uuid_value, location="args") parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") - parser.add_argument("status", type=str, location="args", required=False) + parser.add_argument("status", type=str, choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], location="args", required=False) args = parser.parse_args() workflow_run_service = WorkflowRunService() @@ -66,11 +66,11 @@ def get(self, app_model: App): Get advanced chat workflow runs count statistics """ parser = reqparse.RequestParser() - parser.add_argument("status", type=str, location="args", required=False) + parser.add_argument("status", type=str, choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], location="args", required=False) args = parser.parse_args() workflow_run_service = WorkflowRunService() - result = workflow_run_service.get_advanced_chat_workflow_runs_count( + result = workflow_run_service.get_workflow_runs_count( app_model=app_model, status=args.get("status") ) @@ -97,7 +97,7 @@ def get(self, app_model: App): parser = reqparse.RequestParser() parser.add_argument("last_id", type=uuid_value, location="args") parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") - parser.add_argument("status", type=str, location="args", required=False) + parser.add_argument("status", type=str, choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], location="args", required=False) args = parser.parse_args() workflow_run_service = WorkflowRunService() @@ -123,13 +123,11 @@ def get(self, app_model: App): Get workflow runs count statistics """ parser = reqparse.RequestParser() - parser.add_argument("status", type=str, location="args", required=False) + parser.add_argument("status", type=str, choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], location="args", required=False) args = parser.parse_args() workflow_run_service = WorkflowRunService() - result = workflow_run_service.get_workflow_runs_count( - app_model=app_model, status=args.get("status") - ) + result = workflow_run_service.get_workflow_runs_count(app_model=app_model, status=args.get("status")) return result diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index bf749b067af53b..79594beeede74e 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -70,7 +70,7 @@ def build_workflow_run_for_log_model(api_or_ns: Api | Namespace): "succeeded": fields.Integer, "failed": fields.Integer, "stopped": fields.Integer, - "partial-succeeded": fields.Integer, + "partial_succeeded": fields.Integer(attribute="partial-succeeded"), } workflow_run_detail_fields = { diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 1a0024b5400eaf..2fbc76183bed26 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -137,7 +137,7 @@ def get_workflow_runs_count( - stopped: Count of workflow runs with status "stopped" - partial_succeeded: Count of workflow runs with status "partial-succeeded" - Note: If status is provided, only total will have value and other counts will be 0 + Note: If a status is provided, 'total' will be the count for that status, and the specific status count will also be set to this value, with all other status counts being 0. """ ... diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index fef0b31b4815d8..789e264743a3bf 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -135,6 +135,14 @@ def get_workflow_runs_count( """ Get workflow runs count statistics grouped by status. """ + _initial_status_counts = { + "running": 0, + "succeeded": 0, + "failed": 0, + "stopped": 0, + "partial-succeeded": 0, + } + with self._session_maker() as session: # If status filter is provided, return simple count if status: @@ -148,14 +156,9 @@ def get_workflow_runs_count( ) total = session.scalar(count_stmt) or 0 - # Initialize all status counts to 0 result = { "total": total, - "running": 0, - "succeeded": 0, - "failed": 0, - "stopped": 0, - "partial-succeeded": 0, + **_initial_status_counts, } # Set the count for the filtered status @@ -178,19 +181,13 @@ def get_workflow_runs_count( results = session.execute(base_stmt).all() # Build response dictionary - status_counts = { - "running": 0, - "succeeded": 0, - "failed": 0, - "stopped": 0, - "partial-succeeded": 0, - } + status_counts = _initial_status_counts.copy() total = 0 - for status, count in results: + for status_val, count in results: total += count - if status in status_counts: - status_counts[status] = count + if status_val in status_counts: + status_counts[status_val] = count return { "total": total, diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 859c2d4b989f66..8269eca6981e1b 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -109,21 +109,6 @@ def get_workflow_runs_count(self, app_model: App, status: str | None = None) -> status=status, ) - def get_advanced_chat_workflow_runs_count(self, app_model: App, status: str | None = None) -> dict[str, int]: - """ - Get advanced chat workflow runs count statistics - - :param app_model: app model - :param status: optional status filter - :return: dict with total and status counts - """ - return self._workflow_run_repo.get_workflow_runs_count( - tenant_id=app_model.tenant_id, - app_id=app_model.id, - triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, - status=status, - ) - def get_workflow_run_node_executions( self, app_model: App, From a5a883d3759dd36386aa66d2f8ef52594ddbad2c Mon Sep 17 00:00:00 2001 From: Jacky Su Date: Tue, 14 Oct 2025 08:07:01 +0800 Subject: [PATCH 4/7] test: add UT --- .../test_workflow_run_repository.py | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 api/tests/unit_tests/repositories/test_workflow_run_repository.py diff --git a/api/tests/unit_tests/repositories/test_workflow_run_repository.py b/api/tests/unit_tests/repositories/test_workflow_run_repository.py new file mode 100644 index 00000000000000..45b205186a541f --- /dev/null +++ b/api/tests/unit_tests/repositories/test_workflow_run_repository.py @@ -0,0 +1,157 @@ +"""Unit tests for workflow run repository with status filter.""" + +import uuid +from unittest.mock import MagicMock, patch + +import pytest +from sqlalchemy.orm import sessionmaker + +from models import WorkflowRun, WorkflowRunTriggeredFrom +from repositories.sqlalchemy_api_workflow_run_repository import DifyAPISQLAlchemyWorkflowRunRepository + + +class TestDifyAPISQLAlchemyWorkflowRunRepository: + """Test workflow run repository with status filtering.""" + + @pytest.fixture + def mock_session_maker(self): + """Create a mock session maker.""" + return MagicMock(spec=sessionmaker) + + @pytest.fixture + def repository(self, mock_session_maker): + """Create repository instance with mock session.""" + return DifyAPISQLAlchemyWorkflowRunRepository(mock_session_maker) + + def test_get_paginated_workflow_runs_without_status(self, repository, mock_session_maker): + """Test getting paginated workflow runs without status filter.""" + # Arrange + tenant_id = str(uuid.uuid4()) + app_id = str(uuid.uuid4()) + mock_session = MagicMock() + mock_session_maker.return_value.__enter__.return_value = mock_session + + mock_runs = [MagicMock(spec=WorkflowRun) for _ in range(3)] + mock_session.scalars.return_value.all.return_value = mock_runs + + # Act + result = repository.get_paginated_workflow_runs( + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + limit=20, + last_id=None, + status=None, + ) + + # Assert + assert len(result.data) == 3 + assert result.limit == 20 + assert result.has_more is False + + def test_get_paginated_workflow_runs_with_status_filter(self, repository, mock_session_maker): + """Test getting paginated workflow runs with status filter.""" + # Arrange + tenant_id = str(uuid.uuid4()) + app_id = str(uuid.uuid4()) + mock_session = MagicMock() + mock_session_maker.return_value.__enter__.return_value = mock_session + + mock_runs = [MagicMock(spec=WorkflowRun, status="succeeded") for _ in range(2)] + mock_session.scalars.return_value.all.return_value = mock_runs + + # Act + result = repository.get_paginated_workflow_runs( + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + limit=20, + last_id=None, + status="succeeded", + ) + + # Assert + assert len(result.data) == 2 + assert all(run.status == "succeeded" for run in result.data) + + def test_get_workflow_runs_count_without_status(self, repository, mock_session_maker): + """Test getting workflow runs count without status filter.""" + # Arrange + tenant_id = str(uuid.uuid4()) + app_id = str(uuid.uuid4()) + mock_session = MagicMock() + mock_session_maker.return_value.__enter__.return_value = mock_session + + # Mock the GROUP BY query results + mock_results = [ + ("succeeded", 5), + ("failed", 2), + ("running", 1), + ] + mock_session.execute.return_value.all.return_value = mock_results + + # Act + result = repository.get_workflow_runs_count( + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + status=None, + ) + + # Assert + assert result["total"] == 8 + assert result["succeeded"] == 5 + assert result["failed"] == 2 + assert result["running"] == 1 + assert result["stopped"] == 0 + assert result["partial-succeeded"] == 0 + + def test_get_workflow_runs_count_with_status_filter(self, repository, mock_session_maker): + """Test getting workflow runs count with status filter.""" + # Arrange + tenant_id = str(uuid.uuid4()) + app_id = str(uuid.uuid4()) + mock_session = MagicMock() + mock_session_maker.return_value.__enter__.return_value = mock_session + + # Mock the count query for succeeded status + mock_session.scalar.return_value = 5 + + # Act + result = repository.get_workflow_runs_count( + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + status="succeeded", + ) + + # Assert + assert result["total"] == 5 + assert result["succeeded"] == 5 + assert result["running"] == 0 + assert result["failed"] == 0 + assert result["stopped"] == 0 + assert result["partial-succeeded"] == 0 + + def test_get_workflow_runs_count_with_invalid_status(self, repository, mock_session_maker): + """Test that invalid status is still counted in total but not in any specific status.""" + # Arrange + tenant_id = str(uuid.uuid4()) + app_id = str(uuid.uuid4()) + mock_session = MagicMock() + mock_session_maker.return_value.__enter__.return_value = mock_session + + # Mock count query returning 0 for invalid status + mock_session.scalar.return_value = 0 + + # Act + result = repository.get_workflow_runs_count( + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + status="invalid_status", + ) + + # Assert + assert result["total"] == 0 + assert all(result[status] == 0 for status in ["running", "succeeded", "failed", "stopped", "partial-succeeded"]) From 1c52b7fb7b2ed7b41fdfd9fe14f92e98eab9eef2 Mon Sep 17 00:00:00 2001 From: Jacky Su Date: Tue, 14 Oct 2025 10:20:15 +0800 Subject: [PATCH 5/7] fix: reformat after fix suggestion & add UI --- api/controllers/console/app/workflow_run.py | 36 +++++++++++++++---- .../api_workflow_run_repository.py | 4 ++- .../sqlalchemy_api_workflow_run_repository.py | 21 ++++++----- .../test_workflow_run_repository.py | 12 +++---- 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/api/controllers/console/app/workflow_run.py b/api/controllers/console/app/workflow_run.py index 886a460de09560..52020d33a45be7 100644 --- a/api/controllers/console/app/workflow_run.py +++ b/api/controllers/console/app/workflow_run.py @@ -40,7 +40,13 @@ def get(self, app_model: App): parser = reqparse.RequestParser() parser.add_argument("last_id", type=uuid_value, location="args") parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") - parser.add_argument("status", type=str, choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], location="args", required=False) + parser.add_argument( + "status", + type=str, + choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], + location="args", + required=False, + ) args = parser.parse_args() workflow_run_service = WorkflowRunService() @@ -66,13 +72,17 @@ def get(self, app_model: App): Get advanced chat workflow runs count statistics """ parser = reqparse.RequestParser() - parser.add_argument("status", type=str, choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], location="args", required=False) + parser.add_argument( + "status", + type=str, + choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], + location="args", + required=False, + ) args = parser.parse_args() workflow_run_service = WorkflowRunService() - result = workflow_run_service.get_workflow_runs_count( - app_model=app_model, status=args.get("status") - ) + result = workflow_run_service.get_workflow_runs_count(app_model=app_model, status=args.get("status")) return result @@ -97,7 +107,13 @@ def get(self, app_model: App): parser = reqparse.RequestParser() parser.add_argument("last_id", type=uuid_value, location="args") parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") - parser.add_argument("status", type=str, choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], location="args", required=False) + parser.add_argument( + "status", + type=str, + choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], + location="args", + required=False, + ) args = parser.parse_args() workflow_run_service = WorkflowRunService() @@ -123,7 +139,13 @@ def get(self, app_model: App): Get workflow runs count statistics """ parser = reqparse.RequestParser() - parser.add_argument("status", type=str, choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], location="args", required=False) + parser.add_argument( + "status", + type=str, + choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], + location="args", + required=False, + ) args = parser.parse_args() workflow_run_service = WorkflowRunService() diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 2fbc76183bed26..34b8a92cf9c9b3 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -137,7 +137,9 @@ def get_workflow_runs_count( - stopped: Count of workflow runs with status "stopped" - partial_succeeded: Count of workflow runs with status "partial-succeeded" - Note: If a status is provided, 'total' will be the count for that status, and the specific status count will also be set to this value, with all other status counts being 0. + Note: If a status is provided, 'total' will be the count for that status, + and the specific status count will also be set to this value, with all + other status counts being 0. """ ... diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index 789e264743a3bf..c988a9f2845a98 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -146,9 +146,7 @@ def get_workflow_runs_count( with self._session_maker() as session: # If status filter is provided, return simple count if status: - count_stmt = select( - func.count(WorkflowRun.id) - ).where( + count_stmt = select(func.count(WorkflowRun.id)).where( WorkflowRun.tenant_id == tenant_id, WorkflowRun.app_id == app_id, WorkflowRun.triggered_from == triggered_from, @@ -168,14 +166,15 @@ def get_workflow_runs_count( return result # No status filter - get counts grouped by status - base_stmt = select( - WorkflowRun.status, - func.count(WorkflowRun.id).label("count") - ).where( - WorkflowRun.tenant_id == tenant_id, - WorkflowRun.app_id == app_id, - WorkflowRun.triggered_from == triggered_from, - ).group_by(WorkflowRun.status) + base_stmt = ( + select(WorkflowRun.status, func.count(WorkflowRun.id).label("count")) + .where( + WorkflowRun.tenant_id == tenant_id, + WorkflowRun.app_id == app_id, + WorkflowRun.triggered_from == triggered_from, + ) + .group_by(WorkflowRun.status) + ) # Execute query results = session.execute(base_stmt).all() diff --git a/api/tests/unit_tests/repositories/test_workflow_run_repository.py b/api/tests/unit_tests/repositories/test_workflow_run_repository.py index 45b205186a541f..143ad926d82874 100644 --- a/api/tests/unit_tests/repositories/test_workflow_run_repository.py +++ b/api/tests/unit_tests/repositories/test_workflow_run_repository.py @@ -1,7 +1,7 @@ """Unit tests for workflow run repository with status filter.""" import uuid -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest from sqlalchemy.orm import sessionmaker @@ -30,7 +30,7 @@ def test_get_paginated_workflow_runs_without_status(self, repository, mock_sessi app_id = str(uuid.uuid4()) mock_session = MagicMock() mock_session_maker.return_value.__enter__.return_value = mock_session - + mock_runs = [MagicMock(spec=WorkflowRun) for _ in range(3)] mock_session.scalars.return_value.all.return_value = mock_runs @@ -56,7 +56,7 @@ def test_get_paginated_workflow_runs_with_status_filter(self, repository, mock_s app_id = str(uuid.uuid4()) mock_session = MagicMock() mock_session_maker.return_value.__enter__.return_value = mock_session - + mock_runs = [MagicMock(spec=WorkflowRun, status="succeeded") for _ in range(2)] mock_session.scalars.return_value.all.return_value = mock_runs @@ -81,7 +81,7 @@ def test_get_workflow_runs_count_without_status(self, repository, mock_session_m app_id = str(uuid.uuid4()) mock_session = MagicMock() mock_session_maker.return_value.__enter__.return_value = mock_session - + # Mock the GROUP BY query results mock_results = [ ("succeeded", 5), @@ -113,7 +113,7 @@ def test_get_workflow_runs_count_with_status_filter(self, repository, mock_sessi app_id = str(uuid.uuid4()) mock_session = MagicMock() mock_session_maker.return_value.__enter__.return_value = mock_session - + # Mock the count query for succeeded status mock_session.scalar.return_value = 5 @@ -140,7 +140,7 @@ def test_get_workflow_runs_count_with_invalid_status(self, repository, mock_sess app_id = str(uuid.uuid4()) mock_session = MagicMock() mock_session_maker.return_value.__enter__.return_value = mock_session - + # Mock count query returning 0 for invalid status mock_session.scalar.return_value = 0 From 16a571784af53d444ecc1bdf105e9d6c3967b36c Mon Sep 17 00:00:00 2001 From: Jacky Su Date: Tue, 14 Oct 2025 21:29:36 +0800 Subject: [PATCH 6/7] feat: add time_range filter for monitor --- api/controllers/console/app/workflow_run.py | 39 +++++++- api/libs/custom_inputs.py | 32 +++++++ api/libs/time_parser.py | 67 +++++++++++++ .../api_workflow_run_repository.py | 3 + .../sqlalchemy_api_workflow_run_repository.py | 29 +++--- api/services/workflow_run_service.py | 6 +- .../unit_tests/libs/test_custom_inputs.py | 68 ++++++++++++++ api/tests/unit_tests/libs/test_time_parser.py | 91 ++++++++++++++++++ .../test_workflow_run_repository.py | 94 +++++++++++++++++++ 9 files changed, 415 insertions(+), 14 deletions(-) create mode 100644 api/libs/custom_inputs.py create mode 100644 api/libs/time_parser.py create mode 100644 api/tests/unit_tests/libs/test_custom_inputs.py create mode 100644 api/tests/unit_tests/libs/test_time_parser.py diff --git a/api/controllers/console/app/workflow_run.py b/api/controllers/console/app/workflow_run.py index 52020d33a45be7..6795747d74faaa 100644 --- a/api/controllers/console/app/workflow_run.py +++ b/api/controllers/console/app/workflow_run.py @@ -14,6 +14,7 @@ workflow_run_node_execution_list_fields, workflow_run_pagination_fields, ) +from libs.custom_inputs import time_duration from libs.helper import uuid_value from libs.login import login_required from models import Account, App, AppMode, EndUser @@ -61,6 +62,14 @@ class AdvancedChatAppWorkflowRunCountApi(Resource): @api.doc(description="Get advanced chat workflow runs count statistics") @api.doc(params={"app_id": "Application ID"}) @api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"}) + @api.doc( + params={ + "time_range": ( + "Filter by time range (optional): e.g., 7d (7 days), 4h (4 hours), " + "30m (30 minutes), 30s (30 seconds). Filters by created_at field." + ) + } + ) @api.response(200, "Workflow runs count retrieved successfully", workflow_run_count_fields) @setup_required @login_required @@ -79,10 +88,19 @@ def get(self, app_model: App): location="args", required=False, ) + parser.add_argument( + "time_range", + type=time_duration, + location="args", + required=False, + help="Time range filter (e.g., 7d, 4h, 30m, 30s)", + ) args = parser.parse_args() workflow_run_service = WorkflowRunService() - result = workflow_run_service.get_workflow_runs_count(app_model=app_model, status=args.get("status")) + result = workflow_run_service.get_workflow_runs_count( + app_model=app_model, status=args.get("status"), time_range=args.get("time_range") + ) return result @@ -128,6 +146,14 @@ class WorkflowRunCountApi(Resource): @api.doc(description="Get workflow runs count statistics") @api.doc(params={"app_id": "Application ID"}) @api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"}) + @api.doc( + params={ + "time_range": ( + "Filter by time range (optional): e.g., 7d (7 days), 4h (4 hours), " + "30m (30 minutes), 30s (30 seconds). Filters by created_at field." + ) + } + ) @api.response(200, "Workflow runs count retrieved successfully", workflow_run_count_fields) @setup_required @login_required @@ -146,10 +172,19 @@ def get(self, app_model: App): location="args", required=False, ) + parser.add_argument( + "time_range", + type=time_duration, + location="args", + required=False, + help="Time range filter (e.g., 7d, 4h, 30m, 30s)", + ) args = parser.parse_args() workflow_run_service = WorkflowRunService() - result = workflow_run_service.get_workflow_runs_count(app_model=app_model, status=args.get("status")) + result = workflow_run_service.get_workflow_runs_count( + app_model=app_model, status=args.get("status"), time_range=args.get("time_range") + ) return result diff --git a/api/libs/custom_inputs.py b/api/libs/custom_inputs.py new file mode 100644 index 00000000000000..10d550ed65e0d7 --- /dev/null +++ b/api/libs/custom_inputs.py @@ -0,0 +1,32 @@ +"""Custom input types for Flask-RESTX request parsing.""" + +import re + + +def time_duration(value: str) -> str: + """ + Validate and return time duration string. + + Accepts formats: d (days), h (hours), m (minutes), s (seconds) + Examples: 7d, 4h, 30m, 30s + + Args: + value: The time duration string + + Returns: + The validated time duration string + + Raises: + ValueError: If the format is invalid + """ + if not value: + raise ValueError("Time duration cannot be empty") + + pattern = r"^(\d+)([dhms])$" + if not re.match(pattern, value.lower()): + raise ValueError( + "Invalid time duration format. Use: d (days), h (hours), " + "m (minutes), or s (seconds). Examples: 7d, 4h, 30m, 30s" + ) + + return value.lower() diff --git a/api/libs/time_parser.py b/api/libs/time_parser.py new file mode 100644 index 00000000000000..1d9dd92a08b8d1 --- /dev/null +++ b/api/libs/time_parser.py @@ -0,0 +1,67 @@ +"""Time duration parser utility.""" + +import re +from datetime import UTC, datetime, timedelta + + +def parse_time_duration(duration_str: str) -> timedelta | None: + """ + Parse time duration string to timedelta. + + Supported formats: + - 7d: 7 days + - 4h: 4 hours + - 30m: 30 minutes + - 30s: 30 seconds + + Args: + duration_str: Duration string (e.g., "7d", "4h", "30m", "30s") + + Returns: + timedelta object or None if invalid format + """ + if not duration_str: + return None + + # Pattern: number followed by unit (d, h, m, s) + pattern = r"^(\d+)([dhms])$" + match = re.match(pattern, duration_str.lower()) + + if not match: + return None + + value = int(match.group(1)) + unit = match.group(2) + + if unit == "d": + return timedelta(days=value) + elif unit == "h": + return timedelta(hours=value) + elif unit == "m": + return timedelta(minutes=value) + elif unit == "s": + return timedelta(seconds=value) + + return None + + +def get_time_threshold(duration_str: str | None) -> datetime | None: + """ + Get datetime threshold from duration string. + + Calculates the datetime that is duration_str ago from now. + + Args: + duration_str: Duration string (e.g., "7d", "4h", "30m", "30s") + + Returns: + datetime object representing the threshold time, or None if no duration + """ + if not duration_str: + return None + + duration = parse_time_duration(duration_str) + if duration is None: + return None + + return datetime.now(UTC) - duration diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 34b8a92cf9c9b3..72de9fed3110c3 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -115,6 +115,7 @@ def get_workflow_runs_count( app_id: str, triggered_from: str, status: str | None = None, + time_range: str | None = None, ) -> dict[str, int]: """ Get workflow runs count statistics. @@ -127,6 +128,8 @@ def get_workflow_runs_count( app_id: Application identifier triggered_from: Filter by trigger source (e.g., "debugging", "app-run") status: Optional filter by specific status + time_range: Optional time range filter (e.g., "7d", "4h", "30m", "30s") + Filters records based on created_at field Returns: Dictionary containing: diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index c988a9f2845a98..85d555b437461c 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -131,10 +131,13 @@ def get_workflow_runs_count( app_id: str, triggered_from: str, status: str | None = None, + time_range: str | None = None, ) -> dict[str, int]: """ Get workflow runs count statistics grouped by status. """ + from libs.time_parser import get_time_threshold + _initial_status_counts = { "running": 0, "succeeded": 0, @@ -144,14 +147,22 @@ def get_workflow_runs_count( } with self._session_maker() as session: + # Build base where conditions + base_conditions = [ + WorkflowRun.tenant_id == tenant_id, + WorkflowRun.app_id == app_id, + WorkflowRun.triggered_from == triggered_from, + ] + + # Add time range filter if provided + if time_range: + time_threshold = get_time_threshold(time_range) + if time_threshold: + base_conditions.append(WorkflowRun.created_at >= time_threshold) + # If status filter is provided, return simple count if status: - count_stmt = select(func.count(WorkflowRun.id)).where( - WorkflowRun.tenant_id == tenant_id, - WorkflowRun.app_id == app_id, - WorkflowRun.triggered_from == triggered_from, - WorkflowRun.status == status, - ) + count_stmt = select(func.count(WorkflowRun.id)).where(*base_conditions, WorkflowRun.status == status) total = session.scalar(count_stmt) or 0 result = { @@ -168,11 +179,7 @@ def get_workflow_runs_count( # No status filter - get counts grouped by status base_stmt = ( select(WorkflowRun.status, func.count(WorkflowRun.id).label("count")) - .where( - WorkflowRun.tenant_id == tenant_id, - WorkflowRun.app_id == app_id, - WorkflowRun.triggered_from == triggered_from, - ) + .where(*base_conditions) .group_by(WorkflowRun.status) ) diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 8269eca6981e1b..9a69b301fc0ba2 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -94,12 +94,15 @@ def get_workflow_run(self, app_model: App, run_id: str) -> WorkflowRun | None: run_id=run_id, ) - def get_workflow_runs_count(self, app_model: App, status: str | None = None) -> dict[str, int]: + def get_workflow_runs_count( + self, app_model: App, status: str | None = None, time_range: str | None = None + ) -> dict[str, int]: """ Get workflow runs count statistics :param app_model: app model :param status: optional status filter + :param time_range: optional time range filter (e.g., "7d", "4h", "30m", "30s") :return: dict with total and status counts """ return self._workflow_run_repo.get_workflow_runs_count( @@ -107,6 +110,7 @@ def get_workflow_runs_count(self, app_model: App, status: str | None = None) -> app_id=app_model.id, triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, status=status, + time_range=time_range, ) def get_workflow_run_node_executions( diff --git a/api/tests/unit_tests/libs/test_custom_inputs.py b/api/tests/unit_tests/libs/test_custom_inputs.py new file mode 100644 index 00000000000000..7e4c3b4ff0ea46 --- /dev/null +++ b/api/tests/unit_tests/libs/test_custom_inputs.py @@ -0,0 +1,68 @@ +"""Unit tests for custom input types.""" + +import pytest + +from libs.custom_inputs import time_duration + + +class TestTimeDuration: + """Test time_duration input validator.""" + + def test_valid_days(self): + """Test valid days format.""" + result = time_duration("7d") + assert result == "7d" + + def test_valid_hours(self): + """Test valid hours format.""" + result = time_duration("4h") + assert result == "4h" + + def test_valid_minutes(self): + """Test valid minutes format.""" + result = time_duration("30m") + assert result == "30m" + + def test_valid_seconds(self): + """Test valid seconds format.""" + result = time_duration("30s") + assert result == "30s" + + def test_uppercase_conversion(self): + """Test uppercase units are converted to lowercase.""" + result = time_duration("7D") + assert result == "7d" + + result = time_duration("4H") + assert result == "4h" + + def test_invalid_format_no_unit(self): + """Test invalid format without unit.""" + with pytest.raises(ValueError, match="Invalid time duration format"): + time_duration("7") + + def test_invalid_format_wrong_unit(self): + """Test invalid format with wrong unit.""" + with pytest.raises(ValueError, match="Invalid time duration format"): + time_duration("7days") + + with pytest.raises(ValueError, match="Invalid time duration format"): + time_duration("7x") + + def test_invalid_format_no_number(self): + """Test invalid format without number.""" + with pytest.raises(ValueError, match="Invalid time duration format"): + time_duration("d") + + with pytest.raises(ValueError, match="Invalid time duration format"): + time_duration("abc") + + def test_empty_string(self): + """Test empty string.""" + with pytest.raises(ValueError, match="Time duration cannot be empty"): + time_duration("") + + def test_none(self): + """Test None value.""" + with pytest.raises(ValueError, match="Time duration cannot be empty"): + time_duration(None) diff --git a/api/tests/unit_tests/libs/test_time_parser.py b/api/tests/unit_tests/libs/test_time_parser.py new file mode 100644 index 00000000000000..83ff251272ad08 --- /dev/null +++ b/api/tests/unit_tests/libs/test_time_parser.py @@ -0,0 +1,91 @@ +"""Unit tests for time parser utility.""" + +from datetime import UTC, datetime, timedelta + +from libs.time_parser import get_time_threshold, parse_time_duration + + +class TestParseTimeDuration: + """Test parse_time_duration function.""" + + def test_parse_days(self): + """Test parsing days.""" + result = parse_time_duration("7d") + assert result == timedelta(days=7) + + def test_parse_hours(self): + """Test parsing hours.""" + result = parse_time_duration("4h") + assert result == timedelta(hours=4) + + def test_parse_minutes(self): + """Test parsing minutes.""" + result = parse_time_duration("30m") + assert result == timedelta(minutes=30) + + def test_parse_seconds(self): + """Test parsing seconds.""" + result = parse_time_duration("30s") + assert result == timedelta(seconds=30) + + def test_parse_uppercase(self): + """Test parsing uppercase units.""" + result = parse_time_duration("7D") + assert result == timedelta(days=7) + + def test_parse_invalid_format(self): + """Test parsing invalid format.""" + result = parse_time_duration("7days") + assert result is None + + result = parse_time_duration("abc") + assert result is None + + result = parse_time_duration("7") + assert result is None + + def test_parse_empty_string(self): + """Test parsing empty string.""" + result = parse_time_duration("") + assert result is None + + def test_parse_none(self): + """Test parsing None.""" + result = parse_time_duration(None) + assert result is None + + +class TestGetTimeThreshold: + """Test get_time_threshold function.""" + + def test_get_threshold_days(self): + """Test getting threshold for days.""" + before = datetime.now(UTC) + result = get_time_threshold("7d") + after = datetime.now(UTC) + + assert result is not None + # Result should be approximately 7 days ago + expected = before - timedelta(days=7) + # Allow 1 second tolerance for test execution time + assert abs((result - expected).total_seconds()) < 1 + + def test_get_threshold_hours(self): + """Test getting threshold for hours.""" + before = datetime.now(UTC) + result = get_time_threshold("4h") + after = datetime.now(UTC) + + assert result is not None + expected = before - timedelta(hours=4) + assert abs((result - expected).total_seconds()) < 1 + + def test_get_threshold_invalid(self): + """Test getting threshold with invalid duration.""" + result = get_time_threshold("invalid") + assert result is None + + def test_get_threshold_none(self): + """Test getting threshold with None.""" + result = get_time_threshold(None) + assert result is None diff --git a/api/tests/unit_tests/repositories/test_workflow_run_repository.py b/api/tests/unit_tests/repositories/test_workflow_run_repository.py index 143ad926d82874..8f47f0df481892 100644 --- a/api/tests/unit_tests/repositories/test_workflow_run_repository.py +++ b/api/tests/unit_tests/repositories/test_workflow_run_repository.py @@ -155,3 +155,97 @@ def test_get_workflow_runs_count_with_invalid_status(self, repository, mock_sess # Assert assert result["total"] == 0 assert all(result[status] == 0 for status in ["running", "succeeded", "failed", "stopped", "partial-succeeded"]) + + def test_get_workflow_runs_count_with_time_range(self, repository, mock_session_maker): + """Test getting workflow runs count with time range filter verifies SQL query construction.""" + # Arrange + tenant_id = str(uuid.uuid4()) + app_id = str(uuid.uuid4()) + mock_session = MagicMock() + mock_session_maker.return_value.__enter__.return_value = mock_session + + # Mock the GROUP BY query results + mock_results = [ + ("succeeded", 3), + ("running", 2), + ] + mock_session.execute.return_value.all.return_value = mock_results + + # Act + result = repository.get_workflow_runs_count( + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + status=None, + time_range="1d", + ) + + # Assert results + assert result["total"] == 5 + assert result["succeeded"] == 3 + assert result["running"] == 2 + assert result["failed"] == 0 + + # Verify that execute was called (which means GROUP BY query was used) + assert mock_session.execute.called, "execute should have been called for GROUP BY query" + + # Verify SQL query includes time filter by checking the statement + call_args = mock_session.execute.call_args + assert call_args is not None, "execute should have been called with a statement" + + # The first argument should be the SQL statement + stmt = call_args[0][0] + # Convert to string to inspect the query + query_str = str(stmt.compile(compile_kwargs={"literal_binds": True})) + + # Verify the query includes created_at filter + # The query should have a WHERE clause with created_at comparison + assert "created_at" in query_str.lower() or "workflow_runs.created_at" in query_str.lower(), ( + "Query should include created_at filter for time range" + ) + + def test_get_workflow_runs_count_with_status_and_time_range(self, repository, mock_session_maker): + """Test getting workflow runs count with both status and time range filters verifies SQL query.""" + # Arrange + tenant_id = str(uuid.uuid4()) + app_id = str(uuid.uuid4()) + mock_session = MagicMock() + mock_session_maker.return_value.__enter__.return_value = mock_session + + # Mock the count query for running status within time range + mock_session.scalar.return_value = 2 + + # Act + result = repository.get_workflow_runs_count( + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + status="running", + time_range="1d", + ) + + # Assert results + assert result["total"] == 2 + assert result["running"] == 2 + assert result["succeeded"] == 0 + assert result["failed"] == 0 + + # Verify that scalar was called (which means COUNT query was used) + assert mock_session.scalar.called, "scalar should have been called for count query" + + # Verify SQL query includes both status and time filter + call_args = mock_session.scalar.call_args + assert call_args is not None, "scalar should have been called with a statement" + + # The first argument should be the SQL statement + stmt = call_args[0][0] + # Convert to string to inspect the query + query_str = str(stmt.compile(compile_kwargs={"literal_binds": True})) + + # Verify the query includes both filters + assert "created_at" in query_str.lower() or "workflow_runs.created_at" in query_str.lower(), ( + "Query should include created_at filter for time range" + ) + assert "status" in query_str.lower() or "workflow_runs.status" in query_str.lower(), ( + "Query should include status filter" + ) From 79e1e533749367e98452d68ab7c5058ccf484548 Mon Sep 17 00:00:00 2001 From: Jacky Su Date: Tue, 14 Oct 2025 21:53:29 +0800 Subject: [PATCH 7/7] fix: fix gemini suggestion --- api/controllers/console/app/workflow_run.py | 166 ++++++++++++------ .../sqlalchemy_api_workflow_run_repository.py | 13 +- api/services/workflow_run_service.py | 27 ++- 3 files changed, 132 insertions(+), 74 deletions(-) diff --git a/api/controllers/console/app/workflow_run.py b/api/controllers/console/app/workflow_run.py index 6795747d74faaa..2f5f01de478d2b 100644 --- a/api/controllers/console/app/workflow_run.py +++ b/api/controllers/console/app/workflow_run.py @@ -17,9 +17,73 @@ from libs.custom_inputs import time_duration from libs.helper import uuid_value from libs.login import login_required -from models import Account, App, AppMode, EndUser +from models import Account, App, AppMode, EndUser, WorkflowRunTriggeredFrom from services.workflow_run_service import WorkflowRunService +# Workflow run status choices for filtering +WORKFLOW_RUN_STATUS_CHOICES = ["running", "succeeded", "failed", "stopped", "partial-succeeded"] + + +def _parse_workflow_run_list_args(): + """ + Parse common arguments for workflow run list endpoints. + + Returns: + Parsed arguments containing last_id, limit, status, and triggered_from filters + """ + parser = reqparse.RequestParser() + parser.add_argument("last_id", type=uuid_value, location="args") + parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") + parser.add_argument( + "status", + type=str, + choices=WORKFLOW_RUN_STATUS_CHOICES, + location="args", + required=False, + ) + parser.add_argument( + "triggered_from", + type=str, + choices=["debugging", "app-run"], + location="args", + required=False, + help="Filter by trigger source: debugging or app-run", + ) + return parser.parse_args() + + +def _parse_workflow_run_count_args(): + """ + Parse common arguments for workflow run count endpoints. + + Returns: + Parsed arguments containing status, time_range, and triggered_from filters + """ + parser = reqparse.RequestParser() + parser.add_argument( + "status", + type=str, + choices=WORKFLOW_RUN_STATUS_CHOICES, + location="args", + required=False, + ) + parser.add_argument( + "time_range", + type=time_duration, + location="args", + required=False, + help="Time range filter (e.g., 7d, 4h, 30m, 30s)", + ) + parser.add_argument( + "triggered_from", + type=str, + choices=["debugging", "app-run"], + location="args", + required=False, + help="Filter by trigger source: debugging or app-run", + ) + return parser.parse_args() + @console_ns.route("/apps//advanced-chat/workflow-runs") class AdvancedChatAppWorkflowRunListApi(Resource): @@ -28,6 +92,7 @@ class AdvancedChatAppWorkflowRunListApi(Resource): @api.doc(params={"app_id": "Application ID"}) @api.doc(params={"last_id": "Last run ID for pagination", "limit": "Number of items per page (1-100)"}) @api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"}) + @api.doc(params={"triggered_from": "Filter by trigger source (optional): debugging or app-run. Default: debugging"}) @api.response(200, "Workflow runs retrieved successfully", advanced_chat_workflow_run_pagination_fields) @setup_required @login_required @@ -38,20 +103,19 @@ def get(self, app_model: App): """ Get advanced chat app workflow run list """ - parser = reqparse.RequestParser() - parser.add_argument("last_id", type=uuid_value, location="args") - parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") - parser.add_argument( - "status", - type=str, - choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], - location="args", - required=False, + args = _parse_workflow_run_list_args() + + # Default to DEBUGGING if not specified + triggered_from = ( + WorkflowRunTriggeredFrom(args.get("triggered_from")) + if args.get("triggered_from") + else WorkflowRunTriggeredFrom.DEBUGGING ) - args = parser.parse_args() workflow_run_service = WorkflowRunService() - result = workflow_run_service.get_paginate_advanced_chat_workflow_runs(app_model=app_model, args=args) + result = workflow_run_service.get_paginate_advanced_chat_workflow_runs( + app_model=app_model, args=args, triggered_from=triggered_from + ) return result @@ -70,6 +134,7 @@ class AdvancedChatAppWorkflowRunCountApi(Resource): ) } ) + @api.doc(params={"triggered_from": "Filter by trigger source (optional): debugging or app-run. Default: debugging"}) @api.response(200, "Workflow runs count retrieved successfully", workflow_run_count_fields) @setup_required @login_required @@ -80,26 +145,21 @@ def get(self, app_model: App): """ Get advanced chat workflow runs count statistics """ - parser = reqparse.RequestParser() - parser.add_argument( - "status", - type=str, - choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], - location="args", - required=False, - ) - parser.add_argument( - "time_range", - type=time_duration, - location="args", - required=False, - help="Time range filter (e.g., 7d, 4h, 30m, 30s)", + args = _parse_workflow_run_count_args() + + # Default to DEBUGGING if not specified + triggered_from = ( + WorkflowRunTriggeredFrom(args.get("triggered_from")) + if args.get("triggered_from") + else WorkflowRunTriggeredFrom.DEBUGGING ) - args = parser.parse_args() workflow_run_service = WorkflowRunService() result = workflow_run_service.get_workflow_runs_count( - app_model=app_model, status=args.get("status"), time_range=args.get("time_range") + app_model=app_model, + status=args.get("status"), + time_range=args.get("time_range"), + triggered_from=triggered_from, ) return result @@ -112,6 +172,7 @@ class WorkflowRunListApi(Resource): @api.doc(params={"app_id": "Application ID"}) @api.doc(params={"last_id": "Last run ID for pagination", "limit": "Number of items per page (1-100)"}) @api.doc(params={"status": "Filter by status (optional): running, succeeded, failed, stopped, partial-succeeded"}) + @api.doc(params={"triggered_from": "Filter by trigger source (optional): debugging or app-run. Default: debugging"}) @api.response(200, "Workflow runs retrieved successfully", workflow_run_pagination_fields) @setup_required @login_required @@ -122,20 +183,19 @@ def get(self, app_model: App): """ Get workflow run list """ - parser = reqparse.RequestParser() - parser.add_argument("last_id", type=uuid_value, location="args") - parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") - parser.add_argument( - "status", - type=str, - choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], - location="args", - required=False, + args = _parse_workflow_run_list_args() + + # Default to DEBUGGING for workflow if not specified (backward compatibility) + triggered_from = ( + WorkflowRunTriggeredFrom(args.get("triggered_from")) + if args.get("triggered_from") + else WorkflowRunTriggeredFrom.DEBUGGING ) - args = parser.parse_args() workflow_run_service = WorkflowRunService() - result = workflow_run_service.get_paginate_workflow_runs(app_model=app_model, args=args) + result = workflow_run_service.get_paginate_workflow_runs( + app_model=app_model, args=args, triggered_from=triggered_from + ) return result @@ -154,6 +214,7 @@ class WorkflowRunCountApi(Resource): ) } ) + @api.doc(params={"triggered_from": "Filter by trigger source (optional): debugging or app-run. Default: debugging"}) @api.response(200, "Workflow runs count retrieved successfully", workflow_run_count_fields) @setup_required @login_required @@ -164,26 +225,21 @@ def get(self, app_model: App): """ Get workflow runs count statistics """ - parser = reqparse.RequestParser() - parser.add_argument( - "status", - type=str, - choices=["running", "succeeded", "failed", "stopped", "partial-succeeded"], - location="args", - required=False, - ) - parser.add_argument( - "time_range", - type=time_duration, - location="args", - required=False, - help="Time range filter (e.g., 7d, 4h, 30m, 30s)", + args = _parse_workflow_run_count_args() + + # Default to DEBUGGING for workflow if not specified (backward compatibility) + triggered_from = ( + WorkflowRunTriggeredFrom(args.get("triggered_from")) + if args.get("triggered_from") + else WorkflowRunTriggeredFrom.DEBUGGING ) - args = parser.parse_args() workflow_run_service = WorkflowRunService() result = workflow_run_service.get_workflow_runs_count( - app_model=app_model, status=args.get("status"), time_range=args.get("time_range") + app_model=app_model, + status=args.get("status"), + time_range=args.get("time_range"), + triggered_from=triggered_from, ) return result diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index 85d555b437461c..68affb59f3fede 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -29,6 +29,7 @@ from sqlalchemy.orm import Session, sessionmaker from libs.infinite_scroll_pagination import InfiniteScrollPagination +from libs.time_parser import get_time_threshold from models.workflow import WorkflowRun from repositories.api_workflow_run_repository import APIWorkflowRunRepository @@ -136,8 +137,6 @@ def get_workflow_runs_count( """ Get workflow runs count statistics grouped by status. """ - from libs.time_parser import get_time_threshold - _initial_status_counts = { "running": 0, "succeeded": 0, @@ -165,10 +164,7 @@ def get_workflow_runs_count( count_stmt = select(func.count(WorkflowRun.id)).where(*base_conditions, WorkflowRun.status == status) total = session.scalar(count_stmt) or 0 - result = { - "total": total, - **_initial_status_counts, - } + result = {"total": total} | _initial_status_counts # Set the count for the filtered status if status in result: @@ -195,10 +191,7 @@ def get_workflow_runs_count( if status_val in status_counts: status_counts[status_val] = count - return { - "total": total, - **status_counts, - } + return {"total": total} | status_counts def get_expired_runs_batch( self, diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 9a69b301fc0ba2..5c8719b49946b4 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -26,13 +26,15 @@ def __init__(self): ) self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) - def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination: + def get_paginate_advanced_chat_workflow_runs( + self, app_model: App, args: dict, triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING + ) -> InfiniteScrollPagination: """ Get advanced chat app workflow run list - Only return triggered_from == advanced_chat :param app_model: app model :param args: request args + :param triggered_from: workflow run triggered from (default: DEBUGGING for preview runs) """ class WorkflowWithMessage: @@ -45,7 +47,7 @@ def __init__(self, workflow_run: WorkflowRun): def __getattr__(self, item): return getattr(self._workflow_run, item) - pagination = self.get_paginate_workflow_runs(app_model, args) + pagination = self.get_paginate_workflow_runs(app_model, args, triggered_from) with_message_workflow_runs = [] for workflow_run in pagination.data: @@ -60,13 +62,15 @@ def __getattr__(self, item): pagination.data = with_message_workflow_runs return pagination - def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination: + def get_paginate_workflow_runs( + self, app_model: App, args: dict, triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING + ) -> InfiniteScrollPagination: """ - Get debug workflow run list - Only return triggered_from == debugging + Get workflow run list :param app_model: app model :param args: request args + :param triggered_from: workflow run triggered from (default: DEBUGGING) """ limit = int(args.get("limit", 20)) last_id = args.get("last_id") @@ -75,7 +79,7 @@ def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScro return self._workflow_run_repo.get_paginated_workflow_runs( tenant_id=app_model.tenant_id, app_id=app_model.id, - triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + triggered_from=triggered_from, limit=limit, last_id=last_id, status=status, @@ -95,7 +99,11 @@ def get_workflow_run(self, app_model: App, run_id: str) -> WorkflowRun | None: ) def get_workflow_runs_count( - self, app_model: App, status: str | None = None, time_range: str | None = None + self, + app_model: App, + status: str | None = None, + time_range: str | None = None, + triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING, ) -> dict[str, int]: """ Get workflow runs count statistics @@ -103,12 +111,13 @@ def get_workflow_runs_count( :param app_model: app model :param status: optional status filter :param time_range: optional time range filter (e.g., "7d", "4h", "30m", "30s") + :param triggered_from: workflow run triggered from (default: DEBUGGING) :return: dict with total and status counts """ return self._workflow_run_repo.get_workflow_runs_count( tenant_id=app_model.tenant_id, app_id=app_model.id, - triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + triggered_from=triggered_from, status=status, time_range=time_range, )