From 81a7af858b904e3fad6069b11e7992fdafce673f Mon Sep 17 00:00:00 2001 From: OneStepAt4time Date: Sat, 11 Apr 2026 18:24:36 +0200 Subject: [PATCH] fix: enforce deterministic pipeline persistence failure handling --- src/__tests__/pipeline.test.ts | 42 ++++++++++++++++++++++-- src/pipeline.ts | 60 +++++++++++++++++++++++++--------- 2 files changed, 84 insertions(+), 18 deletions(-) diff --git a/src/__tests__/pipeline.test.ts b/src/__tests__/pipeline.test.ts index 7620ba43..386d364b 100644 --- a/src/__tests__/pipeline.test.ts +++ b/src/__tests__/pipeline.test.ts @@ -1755,7 +1755,7 @@ describe('PipelineManager', () => { await manager.destroy(); }); - it('persist is non-fatal when stateDir does not exist', async () => { + it('fails creation when initial persistence cannot be written', async () => { const manager = new PipelineManager(sessions.mock, eventBus.mock, '/nonexistent/path/that/does/not/exist'); const config: PipelineConfig = { name: 'bad-dir', @@ -1766,8 +1766,44 @@ describe('PipelineManager', () => { sessions.createSession.mockResolvedValue(makeMockSession('s1')); sessions.sendInitialPrompt.mockResolvedValue({ delivered: true, attempts: 1 }); - // Should not throw — write failure is non-fatal - await expect(manager.createPipeline(config)).resolves.toBeDefined(); + await expect(manager.createPipeline(config)).rejects.toThrow( + /Failed to persist pipeline state on creation:/, + ); + + await manager.destroy(); + }); + + it('marks running pipeline as failed when persistence later fails', async () => { + const manager = new PipelineManager(sessions.mock, eventBus.mock, tmpDir); + const config: PipelineConfig = { + name: 'runtime-persist-fail', + workDir: '/app', + stages: [{ name: 'A', prompt: 'run', dependsOn: [] }], + }; + + sessions.createSession.mockResolvedValue(makeMockSession('s1')); + sessions.sendInitialPrompt.mockResolvedValue({ delivered: true, attempts: 1 }); + + const pipeline = await manager.createPipeline(config); + + // Force a persistence failure after successful creation. + (manager as unknown as { stateDir: string }).stateDir = '/nonexistent/path/that/does/not/exist'; + + sessions.getSession.mockReturnValue(makeMockSession('s1', { status: 'idle' })); + await (manager as unknown as { pollPipelines: () => Promise }).pollPipelines(); + + expect(pipeline.status).toBe('failed'); + expect(pipeline.currentStage).toBe('fix'); + const stage = pipeline.stages.find(s => s.name === 'A'); + expect(stage?.status).toBe('completed'); + const persistenceFailureOutput = pipeline.stageHistory + .map(history => history.output) + .find(output => { + if (!output || typeof output !== 'object') return false; + return Reflect.get(output, 'reason') === 'persistence_failed'; + }); + expect(persistenceFailureOutput).toBeDefined(); + await manager.destroy(); }); }); diff --git a/src/pipeline.ts b/src/pipeline.ts index 0b3ffefe..efb0bfa4 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -89,6 +89,8 @@ export class PipelineManager { private pollInterval: NodeJS.Timeout | null = null; private cleanupTimers = new Map(); // #1092: track cleanup timers per pipeline + private readonly PERSISTENCE_FAILED_REASON = 'persistence_failed'; + constructor( private sessions: SessionManager, private eventBus?: SessionEventBus, @@ -177,7 +179,12 @@ export class PipelineManager { this.pipelines.set(id, pipeline); this.pipelineConfigs.set(id, config); // #219: store original config for polling - await this.persistPipelines(); // #1424: persist on creation + const persisted = await this.persistPipelines(); // #1424: persist on creation + if (!persisted.ok) { + this.pipelines.delete(id); + this.pipelineConfigs.delete(id); + throw new Error(`Failed to persist pipeline state on creation: ${persisted.error}`); + } // Start stages with no dependencies immediately await this.advancePipeline(id, config); @@ -266,13 +273,13 @@ export class PipelineManager { stage.status = 'running'; stage.startedAt = Date.now(); this.transitionPipelineStage(pipeline, 'execute', { stage: stage.name, sessionId: session.id }); - await this.persistPipelines(); // #1424: persist after stage starts + await this.persistOrFailPipeline(pipeline, 'stage_start'); // #1424: persist after stage starts } catch (e: unknown) { stage.status = 'failed'; stage.error = getErrorMessage(e); pipeline.status = 'failed'; this.transitionPipelineStage(pipeline, 'fix', { stage: stage.name, error: stage.error }); - await this.persistPipelines(); // #1424: persist after stage fails + await this.persistOrFailPipeline(pipeline, 'stage_fail'); // #1424: persist after stage fails } } @@ -309,7 +316,7 @@ export class PipelineManager { if (!session) { stage.status = 'failed'; stage.error = 'Session disappeared'; - await this.persistPipelines(); // #1424: persist after orphaned stage detected + await this.persistOrFailPipeline(pipeline, 'orphaned_stage'); // #1424: persist after orphaned stage detected continue; } @@ -322,7 +329,7 @@ export class PipelineManager { stage.status = 'failed'; stage.error = 'stage_timeout'; this.transitionPipelineStage(pipeline, 'fix', { stage: stage.name, reason: 'stage_timeout' }); - await this.persistPipelines(); // #1424: persist after stage times out + await this.persistOrFailPipeline(pipeline, 'stage_timeout'); // #1424: persist after stage times out continue; } } @@ -331,7 +338,7 @@ export class PipelineManager { stage.status = 'completed'; stage.completedAt = Date.now(); this.transitionPipelineStage(pipeline, 'verify', { stageCompleted: stage.name }); - await this.persistPipelines(); // #1424: persist after stage completes + await this.persistOrFailPipeline(pipeline, 'stage_complete'); // #1424: persist after stage completes } } @@ -343,7 +350,7 @@ export class PipelineManager { // #1424: Persist after advancePipeline may have transitioned pipeline to completed/failed if (pipeline.status !== 'running') { - await this.persistPipelines(); + await this.persistOrFailPipeline(pipeline, 'pipeline_terminal_transition'); } // #221: Clean up completed/failed pipelines after 30s to avoid memory leak @@ -420,8 +427,8 @@ export class PipelineManager { /** #1424: Persist running pipelines to disk using atomic-rename. * When no running pipelines remain, delete the state file so hydrate() * does not restore stale completed/failed entries on restart. */ - private async persistPipelines(): Promise { - if (!this.stateDir) return; + private async persistPipelines(): Promise<{ ok: true } | { ok: false; error: string }> { + if (!this.stateDir) return { ok: true }; // Only persist running pipelines — completed/failed are cleaned up by timers const running = Array.from(this.pipelines.values()).filter(p => p.status === 'running'); @@ -429,8 +436,15 @@ export class PipelineManager { if (running.length === 0) { // No running pipelines — remove stale state file - try { await unlink(file); } catch { /* already gone or never created */ } - return; + try { + await unlink(file); + } catch (error: unknown) { + const code = typeof error === 'object' && error !== null ? Reflect.get(error, 'code') : undefined; + if (code !== 'ENOENT') { + return { ok: false, error: `failed to delete ${file}: ${getErrorMessage(error)}` }; + } + } + return { ok: true }; } // Include config alongside pipeline state so we can restore full stage details on hydration @@ -445,15 +459,31 @@ export class PipelineManager { const tmpFile = `${file}.tmp`; try { await writeFile(tmpFile, JSON.stringify(entries, null, 2)); - } catch { - // Write failed — skip persistence (non-fatal) - return; + } catch (error: unknown) { + return { ok: false, error: `failed to write ${tmpFile}: ${getErrorMessage(error)}` }; } try { await rename(tmpFile, file); - } catch { + } catch (error: unknown) { // Rename failed — remove tmp file try { await unlink(tmpFile); } catch { /* ignore */ } + return { ok: false, error: `failed to rename ${tmpFile} to ${file}: ${getErrorMessage(error)}` }; + } + + return { ok: true }; + } + + private async persistOrFailPipeline(pipeline: PipelineState, operation: string): Promise { + const persisted = await this.persistPipelines(); + if (persisted.ok) return; + + if (pipeline.status === 'running') { + pipeline.status = 'failed'; + this.transitionPipelineStage(pipeline, 'fix', { + reason: this.PERSISTENCE_FAILED_REASON, + operation, + error: persisted.error, + }); } }