diff --git a/packages/squad-sdk/src/agents/history-shadow.ts b/packages/squad-sdk/src/agents/history-shadow.ts index ce5e8ea04..696940d52 100644 --- a/packages/squad-sdk/src/agents/history-shadow.ts +++ b/packages/squad-sdk/src/agents/history-shadow.ts @@ -11,6 +11,81 @@ import * as fs from 'fs/promises'; import * as path from 'path'; import { ConfigurationError } from '../adapter/errors.js'; +// --------------------------------------------------------------------------- +// Async mutex — prevents read-modify-write races when multiple agents +// concurrently append to the same history.md (#479). +// +// Two layers of protection: +// 1. In-process async mutex (handles concurrent agents in one Node.js process) +// 2. Atomic writes via temp-file + rename (prevents partial reads) +// --------------------------------------------------------------------------- + +/** + * Per-file async mutex. Keyed by resolved file path so concurrent calls + * targeting the same history.md are serialized, while calls to different + * files run in parallel. + * @private + */ +const fileLocks = new Map>(); + +/** + * Execute `fn` while holding an in-process async mutex for `filePath`. + * + * Concurrent callers for the same path are queued — each waits for the + * previous to finish before starting. Different paths run in parallel. + * + * @private + */ +async function withFileLock( + filePath: string, + fn: () => Promise, +): Promise { + // Normalize path to prevent two representations of the same file + const key = path.resolve(filePath); + + // Chain: wait for whoever holds the lock right now + const prev = fileLocks.get(key) ?? Promise.resolve(); + + let releaseLock!: () => void; + const gate = new Promise(resolve => { + releaseLock = resolve; + }); + fileLocks.set(key, gate); + + // Wait until the previous holder finishes + await prev; + + try { + return await fn(); + } finally { + releaseLock(); + // Clean up if we are the last in the chain + if (fileLocks.get(key) === gate) { + fileLocks.delete(key); + } + } +} + +/** + * Write a file atomically by writing to a temp file then renaming. + * Prevents concurrent readers from seeing partial content. + * @private + */ +async function atomicWriteFile( + filePath: string, + content: string, +): Promise { + const tmpPath = `${filePath}.${process.pid}.tmp`; + try { + await fs.writeFile(tmpPath, content, 'utf-8'); + await fs.rename(tmpPath, filePath); + } catch (err) { + // Clean up temp file on failure + await fs.unlink(tmpPath).catch(() => {}); + throw err; + } +} + /** * Standard history sections that agents maintain. */ @@ -147,45 +222,49 @@ export async function appendToHistory( try { const shadowPath = path.join(teamRoot, '.squad', 'agents', agentName, 'history.md'); - // Read existing history - let historyContent: string; - try { - historyContent = await fs.readFile(shadowPath, 'utf-8'); - } catch (error) { - throw new ConfigurationError( - `History shadow not found for agent '${agentName}'. Create it first with createHistoryShadow().`, - { - agentName, - operation: 'appendToHistory', - timestamp: new Date(), - metadata: { shadowPath }, - }, - error instanceof Error ? error : undefined - ); - } - - // Find section or create it - const sectionHeader = `## ${section}`; - const sectionRegex = new RegExp(`^${sectionHeader}\\s*$([\\s\\S]*?)(?=^##\\s|\\Z)`, 'm'); - const match = historyContent.match(sectionRegex); - - const timestamp = new Date().toISOString().split('T')[0]; // YYYY-MM-DD - const entry = `\n### ${timestamp}\n\n${content}\n`; - - let updatedContent: string; - - if (match) { - // Section exists, append to it - const fullMatch = match[0]; - const sectionContent = match[1]; - const updatedSection = `${sectionHeader}\n${sectionContent!.trimEnd()}${entry}`; - updatedContent = historyContent.replace(fullMatch, updatedSection); - } else { - // Section doesn't exist, create it at the end - updatedContent = historyContent.trimEnd() + `\n\n${sectionHeader}${entry}`; - } - - await fs.writeFile(shadowPath, updatedContent, 'utf-8'); + // Acquire file lock before the read-modify-write cycle (#479) + await withFileLock(shadowPath, async () => { + // Read existing history (inside lock to prevent races) + let historyContent: string; + try { + historyContent = await fs.readFile(shadowPath, 'utf-8'); + } catch (error) { + throw new ConfigurationError( + `History shadow not found for agent '${agentName}'. Create it first with createHistoryShadow().`, + { + agentName, + operation: 'appendToHistory', + timestamp: new Date(), + metadata: { shadowPath }, + }, + error instanceof Error ? error : undefined + ); + } + + // Find section or create it + const sectionHeader = `## ${section}`; + const sectionRegex = new RegExp(`^${sectionHeader}\\s*$([\\s\\S]*?)(?=^##\\s|\\Z)`, 'm'); + const match = historyContent.match(sectionRegex); + + const timestamp = new Date().toISOString().split('T')[0]; // YYYY-MM-DD + const entry = `\n### ${timestamp}\n\n${content}\n`; + + let updatedContent: string; + + if (match) { + // Section exists, append to it + const fullMatch = match[0]; + const sectionContent = match[1]; + const updatedSection = `${sectionHeader}\n${sectionContent!.trimEnd()}${entry}`; + updatedContent = historyContent.replace(fullMatch, updatedSection); + } else { + // Section doesn't exist, create it at the end + updatedContent = historyContent.trimEnd() + `\n\n${sectionHeader}${entry}`; + } + + // Atomic write: temp file + rename prevents partial reads + await atomicWriteFile(shadowPath, updatedContent); + }); } catch (error) { if (error instanceof ConfigurationError) { diff --git a/test/history-shadow.test.ts b/test/history-shadow.test.ts new file mode 100644 index 000000000..be851cb9c --- /dev/null +++ b/test/history-shadow.test.ts @@ -0,0 +1,208 @@ +/** + * Tests for history-shadow.ts — specifically the race condition fix (#479). + * + * Verifies that: + * 1. appendToHistory still works correctly for single callers (regression) + * 2. Concurrent appendToHistory calls do not lose data (the race condition) + * 3. Lock files are cleaned up after operations complete + */ + +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import * as fs from 'fs/promises'; +import * as path from 'path'; +import * as os from 'os'; +import { + createHistoryShadow, + appendToHistory, + readHistory, + deleteHistoryShadow, + shadowExists, +} from '@bradygaster/squad-sdk/agents'; + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'squad-history-test-')); +}); + +afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +describe('history-shadow (#479 race condition fix)', () => { + + describe('appendToHistory — single caller (regression)', () => { + it('should append to an existing section', async () => { + await createHistoryShadow(tmpDir, 'testbot', 'Initial context'); + + await appendToHistory(tmpDir, 'testbot', 'Learnings', 'Learned thing one'); + + const history = await readHistory(tmpDir, 'testbot'); + expect(history.learnings).toContain('Learned thing one'); + }); + + it('should append multiple entries sequentially', async () => { + await createHistoryShadow(tmpDir, 'testbot'); + + await appendToHistory(tmpDir, 'testbot', 'Learnings', 'Entry A'); + await appendToHistory(tmpDir, 'testbot', 'Learnings', 'Entry B'); + await appendToHistory(tmpDir, 'testbot', 'Decisions', 'Decision X'); + + const history = await readHistory(tmpDir, 'testbot'); + expect(history.learnings).toContain('Entry A'); + expect(history.learnings).toContain('Entry B'); + expect(history.decisions).toContain('Decision X'); + }); + + it('should create a section if it does not exist', async () => { + // Create a minimal history file without a Patterns section + const agentDir = path.join(tmpDir, '.squad', 'agents', 'testbot'); + await fs.mkdir(agentDir, { recursive: true }); + await fs.writeFile( + path.join(agentDir, 'history.md'), + '# Testbot — Session History\n\n## Learnings\n\n\n', + 'utf-8', + ); + + await appendToHistory(tmpDir, 'testbot', 'Patterns', 'Pattern found'); + + const history = await readHistory(tmpDir, 'testbot'); + expect(history.fullContent).toContain('## Patterns'); + expect(history.fullContent).toContain('Pattern found'); + }); + }); + + describe('appendToHistory — concurrent callers (race condition)', () => { + it('should not lose data when 5 agents append concurrently', async () => { + await createHistoryShadow(tmpDir, 'sharedbot', 'Shared context'); + + const entries = Array.from({ length: 5 }, (_, i) => `Concurrent entry ${i}`); + + // Fire all appends concurrently — the old code would lose all but the last + await Promise.all( + entries.map(entry => + appendToHistory(tmpDir, 'sharedbot', 'Learnings', entry), + ), + ); + + const history = await readHistory(tmpDir, 'sharedbot'); + + // Every single entry must be present — no last-write-wins data loss + for (const entry of entries) { + expect(history.learnings).toContain(entry); + } + }); + + it('should not lose data when appending to different sections concurrently', async () => { + await createHistoryShadow(tmpDir, 'multibot', 'Multi-section test'); + + await Promise.all([ + appendToHistory(tmpDir, 'multibot', 'Learnings', 'Learn concurrent'), + appendToHistory(tmpDir, 'multibot', 'Decisions', 'Decide concurrent'), + appendToHistory(tmpDir, 'multibot', 'Patterns', 'Pattern concurrent'), + ]); + + const history = await readHistory(tmpDir, 'multibot'); + expect(history.learnings).toContain('Learn concurrent'); + expect(history.decisions).toContain('Decide concurrent'); + expect(history.fullContent).toContain('Pattern concurrent'); + }); + + it('should handle 10 rapid concurrent appends without data loss', async () => { + await createHistoryShadow(tmpDir, 'stressbot', 'Stress test'); + + const count = 10; + const entries = Array.from({ length: count }, (_, i) => `Stress-entry-${i}`); + + await Promise.all( + entries.map(entry => + appendToHistory(tmpDir, 'stressbot', 'Learnings', entry), + ), + ); + + const history = await readHistory(tmpDir, 'stressbot'); + for (const entry of entries) { + expect(history.learnings).toContain(entry); + } + }); + }); + + describe('lock file cleanup', () => { + it('should not leave lock files after successful operation', async () => { + await createHistoryShadow(tmpDir, 'cleanbot'); + await appendToHistory(tmpDir, 'cleanbot', 'Learnings', 'Clean entry'); + + const agentDir = path.join(tmpDir, '.squad', 'agents', 'cleanbot'); + const files = await fs.readdir(agentDir); + const lockFiles = files.filter(f => f.endsWith('.lock')); + + expect(lockFiles).toHaveLength(0); + }); + + it('should not leave temp files after successful operation', async () => { + await createHistoryShadow(tmpDir, 'cleanbot2'); + await appendToHistory(tmpDir, 'cleanbot2', 'Learnings', 'Clean entry 2'); + + const agentDir = path.join(tmpDir, '.squad', 'agents', 'cleanbot2'); + const files = await fs.readdir(agentDir); + const tmpFiles = files.filter(f => f.endsWith('.tmp')); + + expect(tmpFiles).toHaveLength(0); + }); + + it('should not leave lock files after concurrent operations', async () => { + await createHistoryShadow(tmpDir, 'cleanbot3'); + + await Promise.all( + Array.from({ length: 5 }, (_, i) => + appendToHistory(tmpDir, 'cleanbot3', 'Learnings', `Clean concurrent ${i}`), + ), + ); + + const agentDir = path.join(tmpDir, '.squad', 'agents', 'cleanbot3'); + const files = await fs.readdir(agentDir); + const lockFiles = files.filter(f => f.endsWith('.lock')); + const tmpFiles = files.filter(f => f.endsWith('.tmp')); + + expect(lockFiles).toHaveLength(0); + expect(tmpFiles).toHaveLength(0); + }); + }); + + describe('existing API contract (regression)', () => { + it('createHistoryShadow creates the file', async () => { + const shadowPath = await createHistoryShadow(tmpDir, 'newbot', 'Hello'); + expect(await shadowExists(tmpDir, 'newbot')).toBe(true); + const content = await fs.readFile(shadowPath, 'utf-8'); + expect(content).toContain('Hello'); + }); + + it('createHistoryShadow does not overwrite existing', async () => { + await createHistoryShadow(tmpDir, 'existbot', 'First'); + await appendToHistory(tmpDir, 'existbot', 'Learnings', 'Important'); + + // Call create again — must not overwrite + await createHistoryShadow(tmpDir, 'existbot', 'Second'); + const history = await readHistory(tmpDir, 'existbot'); + expect(history.learnings).toContain('Important'); + }); + + it('deleteHistoryShadow removes the file', async () => { + await createHistoryShadow(tmpDir, 'delbot'); + expect(await shadowExists(tmpDir, 'delbot')).toBe(true); + await deleteHistoryShadow(tmpDir, 'delbot'); + expect(await shadowExists(tmpDir, 'delbot')).toBe(false); + }); + + it('readHistory returns empty for non-existent agent', async () => { + const history = await readHistory(tmpDir, 'ghost'); + expect(history.fullContent).toBe(''); + }); + + it('appendToHistory throws if shadow does not exist', async () => { + await expect( + appendToHistory(tmpDir, 'nobot', 'Learnings', 'Fail'), + ).rejects.toThrow(/History shadow not found/); + }); + }); +});