Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 118 additions & 39 deletions packages/squad-sdk/src/agents/history-shadow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,81 @@ import * as fs from 'fs/promises';
import * as path from 'path';
import { ConfigurationError } from '../adapter/errors.js';

// ---------------------------------------------------------------------------
// Async mutex — prevents read-modify-write races when multiple agents
// concurrently append to the same history.md (#479).
//
// Two layers of protection:
// 1. In-process async mutex (handles concurrent agents in one Node.js process)
// 2. Atomic writes via temp-file + rename (prevents partial reads)
// ---------------------------------------------------------------------------

/**
* Per-file async mutex. Keyed by resolved file path so concurrent calls
* targeting the same history.md are serialized, while calls to different
* files run in parallel.
* @private
*/
const fileLocks = new Map<string, Promise<void>>();

/**
* Execute `fn` while holding an in-process async mutex for `filePath`.
*
* Concurrent callers for the same path are queued — each waits for the
* previous to finish before starting. Different paths run in parallel.
*
* @private
*/
async function withFileLock<T>(
filePath: string,
fn: () => Promise<T>,
): Promise<T> {
// Normalize path to prevent two representations of the same file
const key = path.resolve(filePath);

// Chain: wait for whoever holds the lock right now
const prev = fileLocks.get(key) ?? Promise.resolve();

let releaseLock!: () => void;
const gate = new Promise<void>(resolve => {
releaseLock = resolve;
});
fileLocks.set(key, gate);

// Wait until the previous holder finishes
await prev;

try {
return await fn();
} finally {
releaseLock();
// Clean up if we are the last in the chain
if (fileLocks.get(key) === gate) {
fileLocks.delete(key);
}
}
}

/**
* Write a file atomically by writing to a temp file then renaming.
* Prevents concurrent readers from seeing partial content.
* @private
*/
async function atomicWriteFile(
filePath: string,
content: string,
): Promise<void> {
const tmpPath = `${filePath}.${process.pid}.tmp`;
try {
await fs.writeFile(tmpPath, content, 'utf-8');
await fs.rename(tmpPath, filePath);
} catch (err) {
// Clean up temp file on failure
await fs.unlink(tmpPath).catch(() => {});
throw err;
}
}

/**
* Standard history sections that agents maintain.
*/
Expand Down Expand Up @@ -147,45 +222,49 @@ export async function appendToHistory(
try {
const shadowPath = path.join(teamRoot, '.squad', 'agents', agentName, 'history.md');

// Read existing history
let historyContent: string;
try {
historyContent = await fs.readFile(shadowPath, 'utf-8');
} catch (error) {
throw new ConfigurationError(
`History shadow not found for agent '${agentName}'. Create it first with createHistoryShadow().`,
{
agentName,
operation: 'appendToHistory',
timestamp: new Date(),
metadata: { shadowPath },
},
error instanceof Error ? error : undefined
);
}

// Find section or create it
const sectionHeader = `## ${section}`;
const sectionRegex = new RegExp(`^${sectionHeader}\\s*$([\\s\\S]*?)(?=^##\\s|\\Z)`, 'm');
const match = historyContent.match(sectionRegex);

const timestamp = new Date().toISOString().split('T')[0]; // YYYY-MM-DD
const entry = `\n### ${timestamp}\n\n${content}\n`;

let updatedContent: string;

if (match) {
// Section exists, append to it
const fullMatch = match[0];
const sectionContent = match[1];
const updatedSection = `${sectionHeader}\n${sectionContent!.trimEnd()}${entry}`;
updatedContent = historyContent.replace(fullMatch, updatedSection);
} else {
// Section doesn't exist, create it at the end
updatedContent = historyContent.trimEnd() + `\n\n${sectionHeader}${entry}`;
}

await fs.writeFile(shadowPath, updatedContent, 'utf-8');
// Acquire file lock before the read-modify-write cycle (#479)
await withFileLock(shadowPath, async () => {
// Read existing history (inside lock to prevent races)
let historyContent: string;
try {
historyContent = await fs.readFile(shadowPath, 'utf-8');
} catch (error) {
throw new ConfigurationError(
`History shadow not found for agent '${agentName}'. Create it first with createHistoryShadow().`,
{
agentName,
operation: 'appendToHistory',
timestamp: new Date(),
metadata: { shadowPath },
},
error instanceof Error ? error : undefined
);
}

// Find section or create it
const sectionHeader = `## ${section}`;
const sectionRegex = new RegExp(`^${sectionHeader}\\s*$([\\s\\S]*?)(?=^##\\s|\\Z)`, 'm');
const match = historyContent.match(sectionRegex);

const timestamp = new Date().toISOString().split('T')[0]; // YYYY-MM-DD
const entry = `\n### ${timestamp}\n\n${content}\n`;

let updatedContent: string;

if (match) {
// Section exists, append to it
const fullMatch = match[0];
const sectionContent = match[1];
const updatedSection = `${sectionHeader}\n${sectionContent!.trimEnd()}${entry}`;
updatedContent = historyContent.replace(fullMatch, updatedSection);
} else {
// Section doesn't exist, create it at the end
updatedContent = historyContent.trimEnd() + `\n\n${sectionHeader}${entry}`;
}

// Atomic write: temp file + rename prevents partial reads
await atomicWriteFile(shadowPath, updatedContent);
});

} catch (error) {
if (error instanceof ConfigurationError) {
Expand Down
208 changes: 208 additions & 0 deletions test/history-shadow.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/**
* Tests for history-shadow.ts — specifically the race condition fix (#479).
*
* Verifies that:
* 1. appendToHistory still works correctly for single callers (regression)
* 2. Concurrent appendToHistory calls do not lose data (the race condition)
* 3. Lock files are cleaned up after operations complete
*/

import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import * as fs from 'fs/promises';
import * as path from 'path';
import * as os from 'os';
import {
createHistoryShadow,
appendToHistory,
readHistory,
deleteHistoryShadow,
shadowExists,
} from '@bradygaster/squad-sdk/agents';

let tmpDir: string;

beforeEach(async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'squad-history-test-'));
});

afterEach(async () => {
await fs.rm(tmpDir, { recursive: true, force: true });
});

describe('history-shadow (#479 race condition fix)', () => {

describe('appendToHistory — single caller (regression)', () => {
it('should append to an existing section', async () => {
await createHistoryShadow(tmpDir, 'testbot', 'Initial context');

await appendToHistory(tmpDir, 'testbot', 'Learnings', 'Learned thing one');

const history = await readHistory(tmpDir, 'testbot');
expect(history.learnings).toContain('Learned thing one');
});

it('should append multiple entries sequentially', async () => {
await createHistoryShadow(tmpDir, 'testbot');

await appendToHistory(tmpDir, 'testbot', 'Learnings', 'Entry A');
await appendToHistory(tmpDir, 'testbot', 'Learnings', 'Entry B');
await appendToHistory(tmpDir, 'testbot', 'Decisions', 'Decision X');

const history = await readHistory(tmpDir, 'testbot');
expect(history.learnings).toContain('Entry A');
expect(history.learnings).toContain('Entry B');
expect(history.decisions).toContain('Decision X');
});

it('should create a section if it does not exist', async () => {
// Create a minimal history file without a Patterns section
const agentDir = path.join(tmpDir, '.squad', 'agents', 'testbot');
await fs.mkdir(agentDir, { recursive: true });
await fs.writeFile(
path.join(agentDir, 'history.md'),
'# Testbot — Session History\n\n## Learnings\n\n<!-- empty -->\n',
'utf-8',
);

await appendToHistory(tmpDir, 'testbot', 'Patterns', 'Pattern found');

const history = await readHistory(tmpDir, 'testbot');
expect(history.fullContent).toContain('## Patterns');
expect(history.fullContent).toContain('Pattern found');
});
});

describe('appendToHistory — concurrent callers (race condition)', () => {
it('should not lose data when 5 agents append concurrently', async () => {
await createHistoryShadow(tmpDir, 'sharedbot', 'Shared context');

const entries = Array.from({ length: 5 }, (_, i) => `Concurrent entry ${i}`);

// Fire all appends concurrently — the old code would lose all but the last
await Promise.all(
entries.map(entry =>
appendToHistory(tmpDir, 'sharedbot', 'Learnings', entry),
),
);

const history = await readHistory(tmpDir, 'sharedbot');

// Every single entry must be present — no last-write-wins data loss
for (const entry of entries) {
expect(history.learnings).toContain(entry);
}
});

it('should not lose data when appending to different sections concurrently', async () => {
await createHistoryShadow(tmpDir, 'multibot', 'Multi-section test');

await Promise.all([
appendToHistory(tmpDir, 'multibot', 'Learnings', 'Learn concurrent'),
appendToHistory(tmpDir, 'multibot', 'Decisions', 'Decide concurrent'),
appendToHistory(tmpDir, 'multibot', 'Patterns', 'Pattern concurrent'),
]);

const history = await readHistory(tmpDir, 'multibot');
expect(history.learnings).toContain('Learn concurrent');
expect(history.decisions).toContain('Decide concurrent');
expect(history.fullContent).toContain('Pattern concurrent');
});

it('should handle 10 rapid concurrent appends without data loss', async () => {
await createHistoryShadow(tmpDir, 'stressbot', 'Stress test');

const count = 10;
const entries = Array.from({ length: count }, (_, i) => `Stress-entry-${i}`);

await Promise.all(
entries.map(entry =>
appendToHistory(tmpDir, 'stressbot', 'Learnings', entry),
),
);

const history = await readHistory(tmpDir, 'stressbot');
for (const entry of entries) {
expect(history.learnings).toContain(entry);
}
});
});

describe('lock file cleanup', () => {
it('should not leave lock files after successful operation', async () => {
await createHistoryShadow(tmpDir, 'cleanbot');
await appendToHistory(tmpDir, 'cleanbot', 'Learnings', 'Clean entry');

const agentDir = path.join(tmpDir, '.squad', 'agents', 'cleanbot');
const files = await fs.readdir(agentDir);
const lockFiles = files.filter(f => f.endsWith('.lock'));

expect(lockFiles).toHaveLength(0);
});

it('should not leave temp files after successful operation', async () => {
await createHistoryShadow(tmpDir, 'cleanbot2');
await appendToHistory(tmpDir, 'cleanbot2', 'Learnings', 'Clean entry 2');

const agentDir = path.join(tmpDir, '.squad', 'agents', 'cleanbot2');
const files = await fs.readdir(agentDir);
const tmpFiles = files.filter(f => f.endsWith('.tmp'));

expect(tmpFiles).toHaveLength(0);
});

it('should not leave lock files after concurrent operations', async () => {
await createHistoryShadow(tmpDir, 'cleanbot3');

await Promise.all(
Array.from({ length: 5 }, (_, i) =>
appendToHistory(tmpDir, 'cleanbot3', 'Learnings', `Clean concurrent ${i}`),
),
);

const agentDir = path.join(tmpDir, '.squad', 'agents', 'cleanbot3');
const files = await fs.readdir(agentDir);
const lockFiles = files.filter(f => f.endsWith('.lock'));
const tmpFiles = files.filter(f => f.endsWith('.tmp'));

expect(lockFiles).toHaveLength(0);
expect(tmpFiles).toHaveLength(0);
});
});

describe('existing API contract (regression)', () => {
it('createHistoryShadow creates the file', async () => {
const shadowPath = await createHistoryShadow(tmpDir, 'newbot', 'Hello');
expect(await shadowExists(tmpDir, 'newbot')).toBe(true);
const content = await fs.readFile(shadowPath, 'utf-8');
expect(content).toContain('Hello');
});

it('createHistoryShadow does not overwrite existing', async () => {
await createHistoryShadow(tmpDir, 'existbot', 'First');
await appendToHistory(tmpDir, 'existbot', 'Learnings', 'Important');

// Call create again — must not overwrite
await createHistoryShadow(tmpDir, 'existbot', 'Second');
const history = await readHistory(tmpDir, 'existbot');
expect(history.learnings).toContain('Important');
});

it('deleteHistoryShadow removes the file', async () => {
await createHistoryShadow(tmpDir, 'delbot');
expect(await shadowExists(tmpDir, 'delbot')).toBe(true);
await deleteHistoryShadow(tmpDir, 'delbot');
expect(await shadowExists(tmpDir, 'delbot')).toBe(false);
});

it('readHistory returns empty for non-existent agent', async () => {
const history = await readHistory(tmpDir, 'ghost');
expect(history.fullContent).toBe('');
});

it('appendToHistory throws if shadow does not exist', async () => {
await expect(
appendToHistory(tmpDir, 'nobot', 'Learnings', 'Fail'),
).rejects.toThrow(/History shadow not found/);
});
});
});
Loading