Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/__tests__/pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ describe('PipelineManager', () => {
await manager.destroy();
});

it('persist is non-fatal when stateDir does not exist', async () => {
it('fails creation when initial persistence cannot be written', async () => {
const manager = new PipelineManager(sessions.mock, eventBus.mock, '/nonexistent/path/that/does/not/exist');
const config: PipelineConfig = {
name: 'bad-dir',
Expand All @@ -1766,8 +1766,44 @@ describe('PipelineManager', () => {
sessions.createSession.mockResolvedValue(makeMockSession('s1'));
sessions.sendInitialPrompt.mockResolvedValue({ delivered: true, attempts: 1 });

// Should not throw — write failure is non-fatal
await expect(manager.createPipeline(config)).resolves.toBeDefined();
await expect(manager.createPipeline(config)).rejects.toThrow(
/Failed to persist pipeline state on creation:/,
);

await manager.destroy();
});

it('marks running pipeline as failed when persistence later fails', async () => {
const manager = new PipelineManager(sessions.mock, eventBus.mock, tmpDir);
const config: PipelineConfig = {
name: 'runtime-persist-fail',
workDir: '/app',
stages: [{ name: 'A', prompt: 'run', dependsOn: [] }],
};

sessions.createSession.mockResolvedValue(makeMockSession('s1'));
sessions.sendInitialPrompt.mockResolvedValue({ delivered: true, attempts: 1 });

const pipeline = await manager.createPipeline(config);

// Force a persistence failure after successful creation.
(manager as unknown as { stateDir: string }).stateDir = '/nonexistent/path/that/does/not/exist';

sessions.getSession.mockReturnValue(makeMockSession('s1', { status: 'idle' }));
await (manager as unknown as { pollPipelines: () => Promise<void> }).pollPipelines();

expect(pipeline.status).toBe('failed');
expect(pipeline.currentStage).toBe('fix');
const stage = pipeline.stages.find(s => s.name === 'A');
expect(stage?.status).toBe('completed');
const persistenceFailureOutput = pipeline.stageHistory
.map(history => history.output)
.find(output => {
if (!output || typeof output !== 'object') return false;
return Reflect.get(output, 'reason') === 'persistence_failed';
});
expect(persistenceFailureOutput).toBeDefined();

await manager.destroy();
});
});
Expand Down
60 changes: 45 additions & 15 deletions src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ export class PipelineManager {
private pollInterval: NodeJS.Timeout | null = null;
private cleanupTimers = new Map<string, NodeJS.Timeout>(); // #1092: track cleanup timers per pipeline

private readonly PERSISTENCE_FAILED_REASON = 'persistence_failed';

constructor(
private sessions: SessionManager,
private eventBus?: SessionEventBus,
Expand Down Expand Up @@ -177,7 +179,12 @@ export class PipelineManager {

this.pipelines.set(id, pipeline);
this.pipelineConfigs.set(id, config); // #219: store original config for polling
await this.persistPipelines(); // #1424: persist on creation
const persisted = await this.persistPipelines(); // #1424: persist on creation
if (!persisted.ok) {
this.pipelines.delete(id);
this.pipelineConfigs.delete(id);
throw new Error(`Failed to persist pipeline state on creation: ${persisted.error}`);
}

// Start stages with no dependencies immediately
await this.advancePipeline(id, config);
Expand Down Expand Up @@ -266,13 +273,13 @@ export class PipelineManager {
stage.status = 'running';
stage.startedAt = Date.now();
this.transitionPipelineStage(pipeline, 'execute', { stage: stage.name, sessionId: session.id });
await this.persistPipelines(); // #1424: persist after stage starts
await this.persistOrFailPipeline(pipeline, 'stage_start'); // #1424: persist after stage starts
} catch (e: unknown) {
stage.status = 'failed';
stage.error = getErrorMessage(e);
pipeline.status = 'failed';
this.transitionPipelineStage(pipeline, 'fix', { stage: stage.name, error: stage.error });
await this.persistPipelines(); // #1424: persist after stage fails
await this.persistOrFailPipeline(pipeline, 'stage_fail'); // #1424: persist after stage fails
}
}

Expand Down Expand Up @@ -309,7 +316,7 @@ export class PipelineManager {
if (!session) {
stage.status = 'failed';
stage.error = 'Session disappeared';
await this.persistPipelines(); // #1424: persist after orphaned stage detected
await this.persistOrFailPipeline(pipeline, 'orphaned_stage'); // #1424: persist after orphaned stage detected
continue;
}

Expand All @@ -322,7 +329,7 @@ export class PipelineManager {
stage.status = 'failed';
stage.error = 'stage_timeout';
this.transitionPipelineStage(pipeline, 'fix', { stage: stage.name, reason: 'stage_timeout' });
await this.persistPipelines(); // #1424: persist after stage times out
await this.persistOrFailPipeline(pipeline, 'stage_timeout'); // #1424: persist after stage times out
continue;
}
}
Expand All @@ -331,7 +338,7 @@ export class PipelineManager {
stage.status = 'completed';
stage.completedAt = Date.now();
this.transitionPipelineStage(pipeline, 'verify', { stageCompleted: stage.name });
await this.persistPipelines(); // #1424: persist after stage completes
await this.persistOrFailPipeline(pipeline, 'stage_complete'); // #1424: persist after stage completes
}
}

Expand All @@ -343,7 +350,7 @@ export class PipelineManager {

// #1424: Persist after advancePipeline may have transitioned pipeline to completed/failed
if (pipeline.status !== 'running') {
await this.persistPipelines();
await this.persistOrFailPipeline(pipeline, 'pipeline_terminal_transition');
}

// #221: Clean up completed/failed pipelines after 30s to avoid memory leak
Expand Down Expand Up @@ -420,17 +427,24 @@ export class PipelineManager {
/** #1424: Persist running pipelines to disk using atomic-rename.
* When no running pipelines remain, delete the state file so hydrate()
* does not restore stale completed/failed entries on restart. */
private async persistPipelines(): Promise<void> {
if (!this.stateDir) return;
private async persistPipelines(): Promise<{ ok: true } | { ok: false; error: string }> {
if (!this.stateDir) return { ok: true };

// Only persist running pipelines — completed/failed are cleaned up by timers
const running = Array.from(this.pipelines.values()).filter(p => p.status === 'running');
const file = join(this.stateDir, 'pipelines.json');

if (running.length === 0) {
// No running pipelines — remove stale state file
try { await unlink(file); } catch { /* already gone or never created */ }
return;
try {
await unlink(file);
} catch (error: unknown) {
const code = typeof error === 'object' && error !== null ? Reflect.get(error, 'code') : undefined;
if (code !== 'ENOENT') {
return { ok: false, error: `failed to delete ${file}: ${getErrorMessage(error)}` };
}
}
return { ok: true };
}

// Include config alongside pipeline state so we can restore full stage details on hydration
Expand All @@ -445,15 +459,31 @@ export class PipelineManager {
const tmpFile = `${file}.tmp`;
try {
await writeFile(tmpFile, JSON.stringify(entries, null, 2));
} catch {
// Write failed — skip persistence (non-fatal)
return;
} catch (error: unknown) {
return { ok: false, error: `failed to write ${tmpFile}: ${getErrorMessage(error)}` };
}
try {
await rename(tmpFile, file);
} catch {
} catch (error: unknown) {
// Rename failed — remove tmp file
try { await unlink(tmpFile); } catch { /* ignore */ }
return { ok: false, error: `failed to rename ${tmpFile} to ${file}: ${getErrorMessage(error)}` };
}

return { ok: true };
}

private async persistOrFailPipeline(pipeline: PipelineState, operation: string): Promise<void> {
const persisted = await this.persistPipelines();
if (persisted.ok) return;

if (pipeline.status === 'running') {
pipeline.status = 'failed';
this.transitionPipelineStage(pipeline, 'fix', {
reason: this.PERSISTENCE_FAILED_REASON,
operation,
error: persisted.error,
});
}
}

Expand Down
Loading