From b50ab4d111563c4aa56cd34aa6985c5a37067c1f Mon Sep 17 00:00:00 2001 From: 01393547 Date: Mon, 19 Jan 2026 17:25:48 +0800 Subject: [PATCH 1/7] fix (workflow): Fixed workflow execution status update logic and added support for secure storage mode. Handled status update logic for workflow and node execution, ensuring terminal state is not overwritten by non-terminal state. Added support for secure storage mode parameters. Optimized database operations and added integrity error handling. --- api/tasks/workflow_execution_tasks.py | 72 ++++++++++++------- api/tasks/workflow_node_execution_tasks.py | 82 ++++++++++++++-------- 2 files changed, 99 insertions(+), 55 deletions(-) diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index 7d145fb50c4ef7..59b9cc3cfc1f19 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -4,14 +4,15 @@ These tasks provide asynchronous storage capabilities for workflow execution data, improving performance by offloading storage operations to background workers. """ - import json import logging from celery import shared_task from sqlalchemy import select from sqlalchemy.orm import sessionmaker +from sqlalchemy.exc import IntegrityError +from core.security.ctx import set_mode from core.workflow.entities.workflow_execution import WorkflowExecution from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter from extensions.ext_database import db @@ -30,6 +31,7 @@ def save_workflow_execution_task( triggered_from: str, creator_user_id: str, creator_user_role: str, + security_store_mode: str | None = None, ) -> bool: """ Asynchronously save or update a workflow execution to the database. @@ -50,31 +52,35 @@ def save_workflow_execution_task( session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) with session_factory() as session: - # Deserialize execution data + set_mode(security_store_mode or None) execution = WorkflowExecution.model_validate(execution_data) - - # Check if workflow run already exists existing_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == execution.id_)) - if existing_run: - # Update existing workflow run _update_workflow_run_from_execution(existing_run, execution) - logger.debug("Updated existing workflow run: %s", execution.id_) - else: - # Create new workflow run - workflow_run = _create_workflow_run_from_execution( - execution=execution, - tenant_id=tenant_id, - app_id=app_id, - triggered_from=WorkflowRunTriggeredFrom(triggered_from), - creator_user_id=creator_user_id, - creator_user_role=CreatorUserRole(creator_user_role), - ) + session.commit() + return True + workflow_run = _create_workflow_run_from_execution( + execution=execution, + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowRunTriggeredFrom(triggered_from), + creator_user_id=creator_user_id, + creator_user_role=CreatorUserRole(creator_user_role), + ) + try: session.add(workflow_run) - logger.debug("Created new workflow run: %s", execution.id_) - - session.commit() - return True + session.commit() + return True + except IntegrityError: + session.rollback() + existing_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == execution.id_)) + if existing_run: + _update_workflow_run_from_execution(existing_run, execution) + session.commit() + return True + session.add(workflow_run) + session.commit() + return True except Exception as e: logger.exception("Failed to save workflow execution %s", execution_data.get("id_", "unknown")) @@ -120,12 +126,24 @@ def _create_workflow_run_from_execution( return workflow_run -def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: WorkflowExecution): - """ - Update a WorkflowRun database model from a WorkflowExecution domain entity. - """ +def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: WorkflowExecution) -> None: json_converter = WorkflowRuntimeTypeConverter() - workflow_run.status = execution.status.value + terminal = {"succeeded", "failed", "stopped", "partial-succeeded"} + current_status = workflow_run.status + new_status = execution.status.value + if current_status in terminal: + if new_status in terminal: + workflow_run.status = new_status + workflow_run.outputs = ( + json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else workflow_run.outputs + ) + workflow_run.error = execution.error_message + workflow_run.elapsed_time = execution.elapsed_time + workflow_run.total_tokens = execution.total_tokens + workflow_run.total_steps = execution.total_steps + workflow_run.finished_at = workflow_run.finished_at or execution.finished_at + return + workflow_run.status = new_status workflow_run.outputs = ( json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}" ) @@ -133,4 +151,4 @@ def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: Wo workflow_run.elapsed_time = execution.elapsed_time workflow_run.total_tokens = execution.total_tokens workflow_run.total_steps = execution.total_steps - workflow_run.finished_at = execution.finished_at + workflow_run.finished_at = workflow_run.finished_at or execution.finished_at diff --git a/api/tasks/workflow_node_execution_tasks.py b/api/tasks/workflow_node_execution_tasks.py index 8f5127670fa824..575a5fe8120443 100644 --- a/api/tasks/workflow_node_execution_tasks.py +++ b/api/tasks/workflow_node_execution_tasks.py @@ -11,7 +11,9 @@ from celery import shared_task from sqlalchemy import select from sqlalchemy.orm import sessionmaker +from sqlalchemy.exc import IntegrityError +from core.security.ctx import set_mode from core.workflow.entities.workflow_node_execution import ( WorkflowNodeExecution, ) @@ -32,6 +34,7 @@ def save_workflow_node_execution_task( triggered_from: str, creator_user_id: str, creator_user_role: str, + security_store_mode: str | None = None, ) -> bool: """ Asynchronously save or update a workflow node execution to the database. @@ -52,33 +55,39 @@ def save_workflow_node_execution_task( session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) with session_factory() as session: - # Deserialize execution data + set_mode(security_store_mode or None) execution = WorkflowNodeExecution.model_validate(execution_data) - - # Check if node execution already exists existing_execution = session.scalar( select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution.id) ) - if existing_execution: - # Update existing node execution _update_node_execution_from_domain(existing_execution, execution) - logger.debug("Updated existing workflow node execution: %s", execution.id) - else: - # Create new node execution - node_execution = _create_node_execution_from_domain( - execution=execution, - tenant_id=tenant_id, - app_id=app_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom(triggered_from), - creator_user_id=creator_user_id, - creator_user_role=CreatorUserRole(creator_user_role), + session.commit() + return True + node_execution = _create_node_execution_from_domain( + execution=execution, + tenant_id=tenant_id, + app_id=app_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom(triggered_from), + creator_user_id=creator_user_id, + creator_user_role=CreatorUserRole(creator_user_role), + ) + try: + session.add(node_execution) + session.commit() + return True + except IntegrityError: + session.rollback() + existing_execution = session.scalar( + select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution.id) ) + if existing_execution: + _update_node_execution_from_domain(existing_execution, execution) + session.commit() + return True session.add(node_execution) - logger.debug("Created new workflow node execution: %s", execution.id) - - session.commit() - return True + session.commit() + return True except Exception as e: logger.exception("Failed to save workflow node execution %s", execution_data.get("id", "unknown")) @@ -141,11 +150,31 @@ def _create_node_execution_from_domain( def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionModel, execution: WorkflowNodeExecution): - """ - Update a WorkflowNodeExecutionModel database model from a WorkflowNodeExecution domain entity. - """ - # Update serialized data json_converter = WorkflowRuntimeTypeConverter() + terminal = {"succeeded", "failed", "exception"} + current_status = node_execution.status + new_status = execution.status.value + if current_status in terminal: + if new_status in terminal: + node_execution.status = new_status + node_execution.inputs = ( + json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else node_execution.inputs + ) + node_execution.process_data = ( + json.dumps(json_converter.to_json_encodable(execution.process_data)) if execution.process_data else node_execution.process_data + ) + node_execution.outputs = ( + json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else node_execution.outputs + ) + if execution.metadata: + metadata_for_json = { + key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items() + } + node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json)) + node_execution.error = execution.error + node_execution.elapsed_time = execution.elapsed_time + node_execution.finished_at = node_execution.finished_at or execution.finished_at + return node_execution.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else "{}" node_execution.process_data = ( json.dumps(json_converter.to_json_encodable(execution.process_data)) if execution.process_data else "{}" @@ -153,7 +182,6 @@ def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionMode node_execution.outputs = ( json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}" ) - # Convert metadata enum keys to strings for JSON serialization if execution.metadata: metadata_for_json = { key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items() @@ -161,9 +189,7 @@ def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionMode node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json)) else: node_execution.execution_metadata = "{}" - - # Update other fields - node_execution.status = execution.status.value + node_execution.status = new_status node_execution.error = execution.error node_execution.elapsed_time = execution.elapsed_time - node_execution.finished_at = execution.finished_at + node_execution.finished_at = node_execution.finished_at or execution.finished_at From 9699990c342c7c91d14261146583c645e084d783 Mon Sep 17 00:00:00 2001 From: 01393547 Date: Mon, 19 Jan 2026 17:54:03 +0800 Subject: [PATCH 2/7] =?UTF-8?q?fix(workflow):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E7=BB=88=E7=AB=AF=E7=8A=B6=E6=80=81=E5=B7=A5=E4=BD=9C=E6=B5=81?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E6=9B=B4=E6=96=B0=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 处理工作流执行和工作流节点执行更新时,确保终端状态不会被非终端状态覆盖 当插入出现IntegrityError时,改为记录警告并抛出异常让Celery重试 --- api/tasks/workflow_execution_tasks.py | 24 ++++++------ api/tasks/workflow_node_execution_tasks.py | 45 ++++++++++------------ 2 files changed, 31 insertions(+), 38 deletions(-) diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index 59b9cc3cfc1f19..5b1d90e8a63ba6 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -78,9 +78,12 @@ def save_workflow_execution_task( _update_workflow_run_from_execution(existing_run, execution) session.commit() return True - session.add(workflow_run) - session.commit() - return True + # This case is rare. Let Celery's retry mechanism handle it. + logger.warning( + "IntegrityError on insert but record with id %s not found after rollback. Task will be retried.", + execution.id_ + ) + raise except Exception as e: logger.exception("Failed to save workflow execution %s", execution_data.get("id_", "unknown")) @@ -131,18 +134,13 @@ def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: Wo terminal = {"succeeded", "failed", "stopped", "partial-succeeded"} current_status = workflow_run.status new_status = execution.status.value - if current_status in terminal: - if new_status in terminal: - workflow_run.status = new_status - workflow_run.outputs = ( - json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else workflow_run.outputs - ) - workflow_run.error = execution.error_message - workflow_run.elapsed_time = execution.elapsed_time - workflow_run.total_tokens = execution.total_tokens - workflow_run.total_steps = execution.total_steps + + if current_status in terminal and new_status not in terminal: + # If current status is terminal, do not update to a non-terminal status. + # Only update finished_at if it's not set. workflow_run.finished_at = workflow_run.finished_at or execution.finished_at return + workflow_run.status = new_status workflow_run.outputs = ( json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}" diff --git a/api/tasks/workflow_node_execution_tasks.py b/api/tasks/workflow_node_execution_tasks.py index 575a5fe8120443..30c4b53b310c78 100644 --- a/api/tasks/workflow_node_execution_tasks.py +++ b/api/tasks/workflow_node_execution_tasks.py @@ -85,9 +85,12 @@ def save_workflow_node_execution_task( _update_node_execution_from_domain(existing_execution, execution) session.commit() return True - session.add(node_execution) - session.commit() - return True + # This case is rare. Let Celery's retry mechanism handle it. + logger.warning( + "IntegrityError on insert but record with id %s not found after rollback. Task will be retried.", + execution.id + ) + raise except Exception as e: logger.exception("Failed to save workflow node execution %s", execution_data.get("id", "unknown")) @@ -154,34 +157,26 @@ def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionMode terminal = {"succeeded", "failed", "exception"} current_status = node_execution.status new_status = execution.status.value - if current_status in terminal: - if new_status in terminal: - node_execution.status = new_status - node_execution.inputs = ( - json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else node_execution.inputs - ) - node_execution.process_data = ( - json.dumps(json_converter.to_json_encodable(execution.process_data)) if execution.process_data else node_execution.process_data - ) - node_execution.outputs = ( - json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else node_execution.outputs - ) - if execution.metadata: - metadata_for_json = { - key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items() - } - node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json)) - node_execution.error = execution.error - node_execution.elapsed_time = execution.elapsed_time + + if current_status in terminal and new_status not in terminal: + # If current status is terminal, do not update to a non-terminal status. + # Only update finished_at if it's not set. node_execution.finished_at = node_execution.finished_at or execution.finished_at return - node_execution.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else "{}" + + node_execution.status = new_status + node_execution.inputs = ( + json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else node_execution.inputs + ) node_execution.process_data = ( - json.dumps(json_converter.to_json_encodable(execution.process_data)) if execution.process_data else "{}" + json.dumps(json_converter.to_json_encodable(execution.process_data)) + if execution.process_data + else node_execution.process_data ) node_execution.outputs = ( json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}" ) + if execution.metadata: metadata_for_json = { key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items() @@ -189,7 +184,7 @@ def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionMode node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json)) else: node_execution.execution_metadata = "{}" - node_execution.status = new_status + node_execution.error = execution.error node_execution.elapsed_time = execution.elapsed_time node_execution.finished_at = node_execution.finished_at or execution.finished_at From c6ab856c69f7e5fbba63cdc8ccd11c11040f3bdc Mon Sep 17 00:00:00 2001 From: 01393547 Date: Mon, 19 Jan 2026 17:57:35 +0800 Subject: [PATCH 3/7] =?UTF-8?q?refactor(security):=20=E7=A7=BB=E9=99=A4wor?= =?UTF-8?q?kflow=E4=BB=BB=E5=8A=A1=E4=B8=AD=E4=B8=8D=E5=86=8D=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E7=9A=84set=5Fmode=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/tasks/workflow_execution_tasks.py | 2 -- api/tasks/workflow_node_execution_tasks.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index 5b1d90e8a63ba6..f894e26e1d16d0 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -12,7 +12,6 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError -from core.security.ctx import set_mode from core.workflow.entities.workflow_execution import WorkflowExecution from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter from extensions.ext_database import db @@ -52,7 +51,6 @@ def save_workflow_execution_task( session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) with session_factory() as session: - set_mode(security_store_mode or None) execution = WorkflowExecution.model_validate(execution_data) existing_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == execution.id_)) if existing_run: diff --git a/api/tasks/workflow_node_execution_tasks.py b/api/tasks/workflow_node_execution_tasks.py index 30c4b53b310c78..0530740a95ddd1 100644 --- a/api/tasks/workflow_node_execution_tasks.py +++ b/api/tasks/workflow_node_execution_tasks.py @@ -13,7 +13,6 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError -from core.security.ctx import set_mode from core.workflow.entities.workflow_node_execution import ( WorkflowNodeExecution, ) @@ -55,7 +54,6 @@ def save_workflow_node_execution_task( session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) with session_factory() as session: - set_mode(security_store_mode or None) execution = WorkflowNodeExecution.model_validate(execution_data) existing_execution = session.scalar( select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution.id) From f993bcd6ea79f614358c0868a0ca3d44370d7afd Mon Sep 17 00:00:00 2001 From: 01393547 Date: Mon, 19 Jan 2026 18:04:17 +0800 Subject: [PATCH 4/7] =?UTF-8?q?refactor(workflow=5Fexecution=5Ftasks):=20?= =?UTF-8?q?=E7=A7=BB=E9=99=A4=E6=9C=AA=E4=BD=BF=E7=94=A8=E7=9A=84security?= =?UTF-8?q?=5Fstore=5Fmode=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/tasks/workflow_execution_tasks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index f894e26e1d16d0..f84e9683bdca22 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -30,7 +30,6 @@ def save_workflow_execution_task( triggered_from: str, creator_user_id: str, creator_user_role: str, - security_store_mode: str | None = None, ) -> bool: """ Asynchronously save or update a workflow execution to the database. From 1f2c7fe3a497baa7bf61739a86f9429171a4ef49 Mon Sep 17 00:00:00 2001 From: 01393547 Date: Mon, 19 Jan 2026 18:20:37 +0800 Subject: [PATCH 5/7] =?UTF-8?q?refactor(workflow):=20=E6=8F=90=E5=8F=96?= =?UTF-8?q?=E7=BB=88=E7=AB=AF=E7=8A=B6=E6=80=81=E4=B8=BA=E5=B8=B8=E9=87=8F?= =?UTF-8?q?=E5=B9=B6=E4=BC=98=E5=8C=96=E7=A9=BA=E5=80=BC=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将工作流执行和节点执行中的终端状态集合提取为模块级常量,提高代码可读性和维护性 优化输入和输出数据的空值检查,使用显式的is not None判断 --- api/tasks/workflow_execution_tasks.py | 6 ++++-- api/tasks/workflow_node_execution_tasks.py | 13 ++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index f84e9683bdca22..f7b67dd5e1512a 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -126,13 +126,15 @@ def _create_workflow_run_from_execution( return workflow_run +WORKFLOW_TERMINAL_STATES = {"succeeded", "failed", "stopped", "partial-succeeded"} + + def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: WorkflowExecution) -> None: json_converter = WorkflowRuntimeTypeConverter() - terminal = {"succeeded", "failed", "stopped", "partial-succeeded"} current_status = workflow_run.status new_status = execution.status.value - if current_status in terminal and new_status not in terminal: + if current_status in WORKFLOW_TERMINAL_STATES and new_status not in WORKFLOW_TERMINAL_STATES: # If current status is terminal, do not update to a non-terminal status. # Only update finished_at if it's not set. workflow_run.finished_at = workflow_run.finished_at or execution.finished_at diff --git a/api/tasks/workflow_node_execution_tasks.py b/api/tasks/workflow_node_execution_tasks.py index 0530740a95ddd1..11ee8db4caab2f 100644 --- a/api/tasks/workflow_node_execution_tasks.py +++ b/api/tasks/workflow_node_execution_tasks.py @@ -33,7 +33,6 @@ def save_workflow_node_execution_task( triggered_from: str, creator_user_id: str, creator_user_role: str, - security_store_mode: str | None = None, ) -> bool: """ Asynchronously save or update a workflow node execution to the database. @@ -150,13 +149,15 @@ def _create_node_execution_from_domain( return node_execution +NODE_TERMINAL_STATES = {"succeeded", "failed", "exception"} + + def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionModel, execution: WorkflowNodeExecution): json_converter = WorkflowRuntimeTypeConverter() - terminal = {"succeeded", "failed", "exception"} current_status = node_execution.status new_status = execution.status.value - if current_status in terminal and new_status not in terminal: + if current_status in NODE_TERMINAL_STATES and new_status not in NODE_TERMINAL_STATES: # If current status is terminal, do not update to a non-terminal status. # Only update finished_at if it's not set. node_execution.finished_at = node_execution.finished_at or execution.finished_at @@ -164,11 +165,13 @@ def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionMode node_execution.status = new_status node_execution.inputs = ( - json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else node_execution.inputs + json.dumps(json_converter.to_json_encodable(execution.inputs)) + if execution.inputs is not None + else node_execution.inputs ) node_execution.process_data = ( json.dumps(json_converter.to_json_encodable(execution.process_data)) - if execution.process_data + if execution.process_data is not None else node_execution.process_data ) node_execution.outputs = ( From 4c1b3354d8690d472b7793c0963d338e3c368ef8 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 19 Jan 2026 10:24:56 +0000 Subject: [PATCH 6/7] [autofix.ci] apply automated fixes --- api/tasks/workflow_execution_tasks.py | 5 +++-- api/tasks/workflow_node_execution_tasks.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index f7b67dd5e1512a..57d3b48a48a1c1 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -4,13 +4,14 @@ These tasks provide asynchronous storage capabilities for workflow execution data, improving performance by offloading storage operations to background workers. """ + import json import logging from celery import shared_task from sqlalchemy import select -from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import sessionmaker from core.workflow.entities.workflow_execution import WorkflowExecution from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter @@ -78,7 +79,7 @@ def save_workflow_execution_task( # This case is rare. Let Celery's retry mechanism handle it. logger.warning( "IntegrityError on insert but record with id %s not found after rollback. Task will be retried.", - execution.id_ + execution.id_, ) raise diff --git a/api/tasks/workflow_node_execution_tasks.py b/api/tasks/workflow_node_execution_tasks.py index 11ee8db4caab2f..863199c21ee461 100644 --- a/api/tasks/workflow_node_execution_tasks.py +++ b/api/tasks/workflow_node_execution_tasks.py @@ -10,8 +10,8 @@ from celery import shared_task from sqlalchemy import select -from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import sessionmaker from core.workflow.entities.workflow_node_execution import ( WorkflowNodeExecution, @@ -85,7 +85,7 @@ def save_workflow_node_execution_task( # This case is rare. Let Celery's retry mechanism handle it. logger.warning( "IntegrityError on insert but record with id %s not found after rollback. Task will be retried.", - execution.id + execution.id, ) raise From 5b99c231085b4a1f287393f75f60439383e5fc42 Mon Sep 17 00:00:00 2001 From: 01393547 Date: Mon, 26 Jan 2026 14:57:16 +0800 Subject: [PATCH 7/7] =?UTF-8?q?refactor(db):=20=E7=A7=BB=E9=99=A4=E6=9C=AA?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E7=9A=84sessionmaker=E5=AF=BC=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 清理两个任务文件中未使用的sqlalchemy.orm.sessionmaker导入,简化依赖并提高代码整洁度 --- api/tasks/workflow_execution_tasks.py | 1 - api/tasks/workflow_node_execution_tasks.py | 1 - 2 files changed, 2 deletions(-) diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index dc906ec19ebc55..b8266dc8a8ea6b 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -11,7 +11,6 @@ from celery import shared_task from sqlalchemy import select from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import sessionmaker from core.db.session_factory import session_factory from core.workflow.entities.workflow_execution import WorkflowExecution diff --git a/api/tasks/workflow_node_execution_tasks.py b/api/tasks/workflow_node_execution_tasks.py index d7e4ec9edecfc6..6f74a5117e458b 100644 --- a/api/tasks/workflow_node_execution_tasks.py +++ b/api/tasks/workflow_node_execution_tasks.py @@ -11,7 +11,6 @@ from celery import shared_task from sqlalchemy import select from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import sessionmaker from core.db.session_factory import session_factory from core.workflow.entities.workflow_node_execution import (