diff --git a/apps/sim/app/api/workflows/[id]/execute/route.test.ts b/apps/sim/app/api/workflows/[id]/execute/route.test.ts index 237ddb4cfed..7be75d9f8ad 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.test.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.test.ts @@ -87,7 +87,7 @@ describe('Workflow Execution API Route', () => { })) vi.doMock('@/lib/workflows/db-helpers', () => ({ - loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({ + loadDeployedWorkflowState: vi.fn().mockResolvedValue({ blocks: { 'starter-id': { id: 'starter-id', @@ -121,7 +121,7 @@ describe('Workflow Execution API Route', () => { ], loops: {}, parallels: {}, - isFromNormalizedTables: true, + isFromNormalizedTables: false, // Changed to false since it's from deployed state }), })) @@ -516,7 +516,7 @@ describe('Workflow Execution API Route', () => { })) vi.doMock('@/lib/workflows/db-helpers', () => ({ - loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({ + loadDeployedWorkflowState: vi.fn().mockResolvedValue({ blocks: { 'starter-id': { id: 'starter-id', @@ -550,7 +550,7 @@ describe('Workflow Execution API Route', () => { ], loops: {}, parallels: {}, - isFromNormalizedTables: true, + isFromNormalizedTables: false, // Changed to false since it's from deployed state }), })) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 893a5efa68a..927ae1f067b 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -9,7 +9,7 @@ import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { decryptSecret } from '@/lib/utils' -import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' +import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers' import { createHttpResponseFromBlock, updateWorkflowRunCounts, @@ -111,20 +111,13 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any): P runningExecutions.add(executionKey) logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`) - // Load workflow data from normalized tables - logger.debug(`[${requestId}] Loading workflow ${workflowId} from normalized tables`) - const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) + // Load workflow data from deployed state for API executions + const deployedData = await loadDeployedWorkflowState(workflowId) - if (!normalizedData) { - throw new Error( - `Workflow ${workflowId} has no normalized data available. Ensure the workflow is properly saved to normalized tables.` - ) - } - - // Use normalized data as primary source - const { blocks, edges, loops, parallels } = normalizedData - logger.info(`[${requestId}] Using normalized tables for workflow execution: ${workflowId}`) - logger.debug(`[${requestId}] Normalized data loaded:`, { + // Use deployed data as primary source for API executions + const { blocks, edges, loops, parallels } = deployedData + logger.info(`[${requestId}] Using deployed state for workflow execution: ${workflowId}`) + logger.debug(`[${requestId}] Deployed data loaded:`, { blocksCount: Object.keys(blocks || {}).length, edgesCount: (edges || []).length, loopsCount: Object.keys(loops || {}).length, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx index 0a9907e271b..448eff7ba81 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx @@ -70,7 +70,10 @@ export function WorkflowBlock({ id, data }: NodeProps) { const currentWorkflow = useCurrentWorkflow() const currentBlock = currentWorkflow.getBlockById(id) - const isEnabled = currentBlock?.enabled ?? true + // In preview mode, use the blockState provided; otherwise use current workflow state + const isEnabled = data.isPreview + ? (data.blockState?.enabled ?? true) + : (currentBlock?.enabled ?? true) // Get diff status from the block itself (set by diff engine) const diffStatus = diff --git a/apps/sim/lib/workflows/db-helpers.ts b/apps/sim/lib/workflows/db-helpers.ts index ff4a334a558..2073dc65c88 100644 --- a/apps/sim/lib/workflows/db-helpers.ts +++ b/apps/sim/lib/workflows/db-helpers.ts @@ -1,8 +1,8 @@ import { eq } from 'drizzle-orm' import { createLogger } from '@/lib/logs/console/logger' import { db } from '@/db' -import { workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema' -import type { LoopConfig, WorkflowState } from '@/stores/workflows/workflow/types' +import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema' +import type { WorkflowState } from '@/stores/workflows/workflow/types' import { SUBFLOW_TYPES } from '@/stores/workflows/workflow/types' const logger = createLogger('WorkflowDBHelpers') @@ -12,7 +12,49 @@ export interface NormalizedWorkflowData { edges: any[] loops: Record parallels: Record - isFromNormalizedTables: true // Flag to indicate this came from new tables + isFromNormalizedTables: boolean // Flag to indicate source (true = normalized tables, false = deployed state) +} + +/** + * Load deployed workflow state for execution + * Returns deployed state if available, otherwise throws error + */ +export async function loadDeployedWorkflowState( + workflowId: string +): Promise { + try { + // First check if workflow is deployed and get deployed state + const [workflowResult] = await db + .select({ + isDeployed: workflow.isDeployed, + deployedState: workflow.deployedState, + }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + + if (!workflowResult) { + throw new Error(`Workflow ${workflowId} not found`) + } + + if (!workflowResult.isDeployed || !workflowResult.deployedState) { + throw new Error(`Workflow ${workflowId} is not deployed or has no deployed state`) + } + + const deployedState = workflowResult.deployedState as any + + // Convert deployed state to normalized format + return { + blocks: deployedState.blocks || {}, + edges: deployedState.edges || [], + loops: deployedState.loops || {}, + parallels: deployedState.parallels || {}, + isFromNormalizedTables: false, // Flag to indicate this came from deployed state + } + } catch (error) { + logger.error(`Error loading deployed workflow state ${workflowId}:`, error) + throw error + } } /** @@ -88,7 +130,6 @@ export async function loadWorkflowFromNormalizedTables( const config = subflow.config || {} if (subflow.type === SUBFLOW_TYPES.LOOP) { - const loopConfig = config as LoopConfig loops[subflow.id] = { id: subflow.id, ...config, @@ -126,7 +167,7 @@ export async function saveWorkflowToNormalizedTables( ): Promise<{ success: boolean; jsonBlob?: any; error?: string }> { try { // Start a transaction - const result = await db.transaction(async (tx) => { + await db.transaction(async (tx) => { // Clear existing data for this workflow await Promise.all([ tx.delete(workflowBlocks).where(eq(workflowBlocks.workflowId, workflowId)), diff --git a/apps/sim/trigger/workflow-execution.ts b/apps/sim/trigger/workflow-execution.ts index 3a84b91c68b..5a455310c35 100644 --- a/apps/sim/trigger/workflow-execution.ts +++ b/apps/sim/trigger/workflow-execution.ts @@ -6,7 +6,7 @@ import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { decryptSecret } from '@/lib/utils' -import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' +import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers' import { updateWorkflowRunCounts } from '@/lib/workflows/utils' import { db } from '@/db' import { environment as environmentTable, userStats } from '@/db/schema' @@ -60,16 +60,10 @@ export const workflowExecution = task({ ) } - // Load workflow data from normalized tables - const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) - if (!normalizedData) { - logger.error(`[${requestId}] Workflow not found in normalized tables: ${workflowId}`) - throw new Error(`Workflow ${workflowId} data not found in normalized tables`) - } - - logger.info(`[${requestId}] Workflow loaded successfully: ${workflowId}`) + // Load workflow data from deployed state (this task is only used for API executions right now) + const workflowData = await loadDeployedWorkflowState(workflowId) - const { blocks, edges, loops, parallels } = normalizedData + const { blocks, edges, loops, parallels } = workflowData // Merge subblock states (server-safe version doesn't need workflowId) const mergedStates = mergeSubblockState(blocks, {})