Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/engine/src/services/chunkEncoder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ describe("buildEncoderArgs HDR color space", () => {
expect.stringContaining("HDR is not supported with codec=h264"),
);
warnSpy.mockRestore();

});

it("uses range conversion for HDR CPU encoding", () => {
Expand Down
248 changes: 247 additions & 1 deletion packages/engine/src/services/streamingEncoder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
* master-display / max-cll and ship as SDR BT.2020 again.
*/

import { describe, expect, it } from "vitest";
import { EventEmitter } from "events";
import { mkdtempSync } from "fs";
import { tmpdir } from "os";
import { join } from "path";
import { afterEach, describe, expect, it, vi } from "vitest";

import {
buildStreamingArgs,
Expand Down Expand Up @@ -284,3 +288,245 @@ describe("createFrameReorderBuffer", () => {
await buf.waitForAllDone();
});
});

interface FakeStdin extends EventEmitter {
destroyed: boolean;
end: (cb?: () => void) => void;
write: (chunk: Buffer) => boolean;
}

interface FakeProc extends EventEmitter {
stdin: FakeStdin;
stdout: EventEmitter;
stderr: EventEmitter;
kill: ReturnType<typeof vi.fn>;
}

interface SpawnCall {
command: string;
args: readonly string[];
proc: FakeProc;
}

function createFakeStdin(): FakeStdin {
const state = { destroyed: false };
const stdin = new EventEmitter() as FakeStdin;
Object.defineProperty(stdin, "destroyed", {
get: () => state.destroyed,
set: (v: boolean) => {
state.destroyed = v;
},
});
stdin.end = (cb?: () => void) => {
state.destroyed = true;
if (cb) process.nextTick(cb);
};
stdin.write = (_chunk: Buffer): boolean => !state.destroyed;
return stdin;
}

function createFakeProc(): FakeProc {
const proc = new EventEmitter() as FakeProc;
proc.stdin = createFakeStdin();
proc.stdout = new EventEmitter();
proc.stderr = new EventEmitter();
proc.kill = vi.fn();
return proc;
}

function createSpawnSpy(): {
spawn: (command: string, args: readonly string[]) => FakeProc;
calls: SpawnCall[];
} {
const calls: SpawnCall[] = [];
const spawn = (command: string, args: readonly string[]): FakeProc => {
const proc = createFakeProc();
calls.push({ command, args, proc });
return proc;
};
return { spawn, calls };
}

const baseOptions: StreamingEncoderOptions = {
fps: 30,
width: 100,
height: 100,
codec: "h264",
useGpu: false,
};

describe("spawnStreamingEncoder lifecycle and cleanup", () => {
afterEach(() => {
vi.resetModules();
vi.doUnmock("child_process");
});

it("returns a success result when ffmpeg exits cleanly after close()", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-success-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

expect(calls).toHaveLength(1);
expect(calls[0]?.command).toBe("ffmpeg");

const proc = calls[0]!.proc;
const closePromise = encoder.close();
process.nextTick(() => proc.emit("close", 0));

const result = await closePromise;
expect(result.success).toBe(true);
expect(result.error).toBeUndefined();
expect(result.fileSize).toBe(0); // No real ffmpeg, no file written
});

it("returns a failure result (does NOT throw) when ffmpeg exits non-zero before close()", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-fail-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
proc.stderr.emit("data", Buffer.from("Encoder error\n"));
await new Promise<void>((resolve) => {
process.nextTick(() => {
proc.emit("close", 1);
resolve();
});
});

const result = await encoder.close();
expect(result.success).toBe(false);
expect(result.error).toContain("FFmpeg exited with code 1");
expect(result.error).toContain("Encoder error");
});

it("returns a failure result (does NOT throw) when ffmpeg fails to spawn (ENOENT)", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-enoent-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
await new Promise<void>((resolve) => {
process.nextTick(() => {
const err = new Error("spawn ffmpeg ENOENT") as NodeJS.ErrnoException;
err.code = "ENOENT";
proc.emit("error", err);
resolve();
});
});

const result = await encoder.close();
expect(result.success).toBe(false);
expect(result.error).toMatch(/spawn ffmpeg ENOENT/);
});

it("returns a 'cancelled' result and SIGTERMs ffmpeg when the abort signal fires", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const controller = new AbortController();
const dir = mkdtempSync(join(tmpdir(), "se-abort-"));
const encoder = await spawnStreamingEncoder(
join(dir, "out.mp4"),
baseOptions,
controller.signal,
);

const proc = calls[0]!.proc;
controller.abort();
expect(proc.kill).toHaveBeenCalledWith("SIGTERM");

process.nextTick(() => proc.emit("close", null));
const result = await encoder.close();

expect(result.success).toBe(false);
expect(result.error).toBe("Streaming encode cancelled");
});

it("close() is idempotent: a second call still resolves to a result and does not throw", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-idempotent-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
process.nextTick(() => proc.emit("close", 0));

const first = await encoder.close();
expect(first.success).toBe(true);

// Defensive cleanup in renderOrchestrator may call close() again after the
// explicit call. Verify the second call doesn't reject — it can return
// either success (cached) or a benign failure result, but must not throw.
let threw = false;
try {
const second = await encoder.close();
expect(typeof second.success).toBe("boolean");
} catch {
threw = true;
}
expect(threw).toBe(false);
});

it("writeFrame returns false after ffmpeg has exited", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-writefail-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

expect(encoder.writeFrame(Buffer.from([0]))).toBe(true);

const proc = calls[0]!.proc;
await new Promise<void>((resolve) => {
process.nextTick(() => {
proc.emit("close", 0);
resolve();
});
});

expect(encoder.writeFrame(Buffer.from([0]))).toBe(false);
});

it("close() removes the abort listener so a post-close abort does not re-kill ffmpeg", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const controller = new AbortController();
const dir = mkdtempSync(join(tmpdir(), "se-detach-"));
const encoder = await spawnStreamingEncoder(
join(dir, "out.mp4"),
baseOptions,
controller.signal,
);

const proc = calls[0]!.proc;
process.nextTick(() => proc.emit("close", 0));
await encoder.close();

expect(proc.kill).not.toHaveBeenCalled();

controller.abort();
expect(proc.kill).not.toHaveBeenCalled();
});
});
Loading