Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions doc/api/child_process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,39 @@ console.log(`Spawned child pid: ${grep.pid}`);
grep.stdin.end();
```

### `subprocess.readLines([options])`

<!-- YAML
added: REPLACEME
-->

* `options` {Object}
* `rejectIfNonZeroExitCode` {boolean} **Default:** `false`
* `listenTo` {string} Can be one of `'stdout'`, or `'stderr'`.
**Default:** `'stdout'`
* Returns: {readline.InterfaceConstructor}

Convenience method to create a `node:readline` interface and stream over the
output of the child process.

```mjs
import { spawn } from 'node:child_process';

for await (const fileInfo of spawn('ls', ['-a']).readLines()) {
console.log(fileInfo);
}
```

```cjs
const { spawn } = require('node:child_process');

(async () => {
for await (const fileInfo of spawn('ls', ['-a']).readLines()) {
console.log(fileInfo);
}
})();
```

### `subprocess.ref()`

<!-- YAML
Expand Down
4 changes: 3 additions & 1 deletion lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ const {
} = require('internal/validators');
const child_process = require('internal/child_process');
const {
ChildProcess,
getValidStdio,
kSignal,
setupChannel,
ChildProcess,
stdioStringToArray,
} = child_process;

Expand Down Expand Up @@ -782,6 +783,7 @@ function spawn(file, args, options) {

if (options.signal) {
const signal = options.signal;
child[kSignal] = signal;
if (signal.aborted) {
process.nextTick(onAbortListener);
} else {
Expand Down
82 changes: 81 additions & 1 deletion lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@ const {
ArrayIsArray,
ArrayPrototypePush,
ArrayPrototypeReduce,
ArrayPrototypeShift,
ArrayPrototypeSlice,
FunctionPrototype,
FunctionPrototypeCall,
ObjectDefineProperty,
ObjectSetPrototypeOf,
Promise,
PromiseResolve,
PromisePrototypeThen,
ReflectApply,
SafePromiseRace,
StringPrototypeSlice,
Symbol,
SymbolAsyncIterator,
SymbolDispose,
Uint8Array,
} = primordials;
Expand All @@ -29,9 +35,11 @@ const {
ERR_IPC_SYNC_FORK,
ERR_MISSING_ARGS,
},
genericNodeError,
} = require('internal/errors');
const {
validateArray,
validateBoolean,
validateObject,
validateOneOf,
validateString,
Expand All @@ -41,6 +49,7 @@ const net = require('net');
const dgram = require('dgram');
const inspect = require('internal/util/inspect').inspect;
const assert = require('internal/assert');
const { Interface } = require('internal/readline/interface');

const { Process } = internalBinding('process_wrap');
const {
Expand All @@ -56,7 +65,7 @@ const { TTY } = internalBinding('tty_wrap');
const { UDP } = internalBinding('udp_wrap');
const SocketList = require('internal/socket_list');
const { owner_symbol } = require('internal/async_hooks').symbols;
const { convertToValidSignal, deprecate } = require('internal/util');
const { convertToValidSignal, deprecate, kEmptyObject } = require('internal/util');
const { isArrayBufferView } = require('internal/util/types');
const spawn_sync = internalBinding('spawn_sync');
const { kStateSymbol } = require('internal/dgram');
Expand Down Expand Up @@ -85,6 +94,7 @@ const MAX_HANDLE_RETRANSMISSIONS = 3;
const kChannelHandle = Symbol('kChannelHandle');
const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');
const kPendingMessages = Symbol('kPendingMessages');
const kSignal = Symbol('kSignal');

// This object contain function to convert TCP objects to native handle objects
// and back again.
Expand Down Expand Up @@ -533,6 +543,75 @@ ChildProcess.prototype.unref = function() {
if (this._handle) this._handle.unref();
};

ChildProcess.prototype.readLines = function readLines(options = undefined) {
const { listenTo, rejectIfNonZeroExitCode = false } = options ?? kEmptyObject;
if (options != null) {
if (listenTo != null) validateOneOf(listenTo, 'options.listenTo', ['stdout', 'stderr']);
validateBoolean(rejectIfNonZeroExitCode, 'options.rejectIfNonZeroExitCode');
}
let input;
switch (listenTo) {
// TODO(aduh95): add case 'both'

case 'stderr':
input = this.stderr;
break;

default:
input = this.stdout;
}
const rlInterface = new Interface({
__proto__: null,
signal: this[kSignal],
input,
});
const errorPromise = new Promise((_, reject) => this.once('error', reject));
const exitPromise = new Promise((resolve, reject) => this.once('close', rejectIfNonZeroExitCode ? (code) => {
if (code === 0) resolve({ done: true, value: undefined });
else reject(genericNodeError('Command failed', { pid: this.pid, signal: this.signalCode, status: code }));
} : () => resolve({ done: true, value: undefined })));
rlInterface[SymbolAsyncIterator] = function() {
let resolveNext;
let nextPromise = new Promise((resolve) => {
resolveNext = resolve;
});
const buffer = ObjectSetPrototypeOf([], null);
this.on('line', (value) => {
if (resolveNext != null) {
resolveNext(value);
resolveNext = null;
} else {
ArrayPrototypePush(buffer, value);
}
},
);
return { __proto__: null,
next: () => SafePromiseRace([
errorPromise,
(() => {
if (nextPromise != null) {
const p = nextPromise;
nextPromise = buffer.length === 0 ? new Promise((resolve) => {
PromisePrototypeThen(p, () => {
resolveNext = resolve;
});
}) : null;
return p;
}
const iteratorObject = { value: ArrayPrototypeShift(buffer), done: false };
if (buffer.length === 0) {
nextPromise = new Promise((resolve) => {
resolveNext = resolve;
});
}
return PromiseResolve(iteratorObject);
})(),
exitPromise]),
[SymbolAsyncIterator]() { return this; } };
};
return rlInterface;
};

class Control extends EventEmitter {
#channel = null;
#refs = 0;
Expand Down Expand Up @@ -1132,6 +1211,7 @@ function spawnSync(options) {
module.exports = {
ChildProcess,
kChannelHandle,
kSignal,
setupChannel,
getValidStdio,
stdioStringToArray,
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/readline/interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ const kRefreshLine = Symbol('_refreshLine');
const kSawKeyPress = Symbol('_sawKeyPress');
const kSawReturnAt = Symbol('_sawReturnAt');
const kSetRawMode = Symbol('_setRawMode');
const kSignal = Symbol('kSignal');
const kTabComplete = Symbol('_tabComplete');
const kTabCompleter = Symbol('_tabCompleter');
const kTtyWrite = Symbol('_ttyWrite');
Expand Down Expand Up @@ -322,13 +323,16 @@ function InterfaceConstructor(input, output, completer, terminal) {
}

if (signal) {
this[kSignal] = signal;
const onAborted = () => self.close();
if (signal.aborted) {
process.nextTick(onAborted);
} else {
const disposable = EventEmitter.addAbortListener(signal, onAborted);
self.once('close', disposable[SymbolDispose]);
}
} else {
this[kSignal] = undefined;
}

// Current line
Expand Down Expand Up @@ -1357,6 +1361,8 @@ class Interface extends InterfaceConstructor {
kFirstEventParam ??= require('internal/events/symbols').kFirstEventParam;
this[kLineObjectStream] = EventEmitter.on(
this, 'line', {
__proto__: null,
signal: this[kSignal],
close: ['close'],
highWatermark: 1024,
[kFirstEventParam]: true,
Expand Down
88 changes: 88 additions & 0 deletions test/parallel/test-child-process-readLines.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import * as common from '../common/index.mjs';

import assert from 'node:assert';
import { spawn, exec } from 'node:child_process';


{
const lines = exec(`${process.execPath} -p 42`).readLines();

lines.on('line', common.mustCall((result) => {
assert.strictEqual(result, '42');
lines.on('line', common.mustNotCall('We expect only one line event'));
}));
}

{
const fn = common.mustCall();
for await (const line of spawn(process.execPath, ['-p', 42]).readLines()) {
assert.strictEqual(line, '42');
fn();
}
}

{
const cp = spawn(process.execPath, ['-p', 42]);

[0, 1, '', 'a', 0n, 1n, Symbol(), () => {}, {}, []].forEach((listenTo) => assert.throws(
() => cp.readLines({ listenTo }),
{ code: 'ERR_INVALID_ARG_VALUE' }));

[0, 1, '', 'a', 0n, 1n, Symbol(), () => {}, {}, []].forEach((rejectIfNonZeroExitCode) => assert.throws(
() => cp.readLines({ rejectIfNonZeroExitCode }),
{ code: 'ERR_INVALID_ARG_TYPE' }));
}

await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of spawn(process.execPath, ['-p', 42], { signal: AbortSignal.abort() }).readLines());
}, { name: 'AbortError' });


await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of spawn(process.execPath, { signal: AbortSignal.abort() }).readLines());
}, { name: 'AbortError' });

{
const ac = new AbortController();
const cp = spawn(
process.execPath,
['-e', 'setTimeout(()=>console.log("line 2"), 10);setImmediate(()=>console.log("line 1"));'],
{ signal: ac.signal });
await assert.rejects(async () => {
for await (const line of cp.readLines()) {
assert.strictEqual(line, 'line 1');
ac.abort();
}
}, { name: 'AbortError' });
assert.strictEqual(ac.signal.aborted, true);
}

{
const cp = spawn(process.execPath, ['-e', 'throw null']);
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of cp.readLines({ rejectIfNonZeroExitCode: true }));
}, { pid: cp.pid, status: 1, signal: null });
}


{
const fn = common.mustCall();
for await (const line of spawn(process.execPath, ['-e', 'console.error(42)']).readLines({ listenTo: 'stderr' })) {
assert.strictEqual(line, '42');
fn();
}
}

{
const stderr = (await spawn(process.execPath, ['-e', 'throw new Error']).stderr.toArray()).join('').split(/\r?\n/);
const cp = spawn(process.execPath, ['-e', 'throw new Error']);
assert.strictEqual(cp.exitCode, null);
for await (const line of cp.readLines({ listenTo: 'stderr' })) {
assert.strictEqual(line, stderr.shift());
}
assert.strictEqual(cp.exitCode, 1);
assert.deepStrictEqual(stderr, ['']);
}