Skip to content
Merged
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
"@yarnpkg/core": "^4.0.3",
"chai": "^4.4.1",
"eslint": "^8.57.0",
"mock-stdin": "^1.0.0",
"oclif": "^4.4.18",
"tsx": "^4.7.1",
"typescript": "^5.3.3",
Expand Down
96 changes: 93 additions & 3 deletions src/commands/call.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
import { readFile } from 'node:fs/promises';
import { resolve } from 'node:path';
import process from 'node:process';

import { Args } from '@oclif/core';
import { Args, Flags } from '@oclif/core';
import { ActorStartOptions, ApifyClient } from 'apify-client';

import { ApifyCommand } from '../lib/apify_command.js';
import { SharedRunOnCloudFlags, runActorOrTaskOnCloud } from '../lib/commands/run-on-cloud.js';
import { LOCAL_CONFIG_PATH } from '../lib/consts.js';
import { CommandExitCodes, LOCAL_CONFIG_PATH } from '../lib/consts.js';
import { error } from '../lib/outputs.js';
import { getLocalConfig, getLocalUserInfo, getLoggedClientOrThrow } from '../lib/utils.js';

export class ActorCallCommand extends ApifyCommand<typeof ActorCallCommand> {
static override description = 'Runs a specific Actor remotely on the Apify cloud platform.\n'
+ 'The Actor is run under your current Apify account. Therefore you need to be logged in by calling "apify login". '
+ 'It takes input for the Actor from the default local key-value store by default.';

static override flags = SharedRunOnCloudFlags('Actor');
static override flags = {
...SharedRunOnCloudFlags('Actor'),
input: Flags.string({
char: 'i',
description: 'Optional JSON input to be given to the Actor.',
required: false,
allowStdin: true,
exclusive: ['input-file'],
}),
'input-file': Flags.string({
aliases: ['if'],
// eslint-disable-next-line max-len
description: 'Optional path to a file with JSON input to be given to the Actor. The file must be a valid JSON file. You can also specify `-` to read from standard input.',
required: false,
allowStdin: true,
exclusive: ['input'],
}),
};

static override args = {
actorId: Args.string({
Expand Down Expand Up @@ -57,6 +77,13 @@ export class ActorCallCommand extends ApifyCommand<typeof ActorCallCommand> {
runOpts.memory = this.flags.memory;
}

const inputOverride = await this.resolveInputOverride(cwd);

// Means we couldn't resolve input, so we should exit
if (inputOverride === false) {
return;
}

await runActorOrTaskOnCloud(apifyClient, {
actorOrTaskData: {
id: actorId,
Expand All @@ -65,6 +92,7 @@ export class ActorCallCommand extends ApifyCommand<typeof ActorCallCommand> {
runOptions: runOpts,
type: 'Actor',
waitForFinishMillis,
inputOverride,
});
}

Expand Down Expand Up @@ -117,4 +145,66 @@ export class ActorCallCommand extends ApifyCommand<typeof ActorCallCommand> {

throw new Error('Please provide an Actor ID or name, or run this command from a directory with a valid Apify Actor.');
}

private async resolveInputOverride(cwd: string) {
let input: Record<string, unknown> | undefined;

if (!this.flags.input && !this.flags.inputFile) {
// Try reading stdin
const stdin = await this.readStdin(process.stdin);

if (stdin) {
try {
input = JSON.parse(stdin);
} catch (err) {
error({ message: `Cannot parse JSON input from standard input.\n ${(err as Error).message}` });
process.exitCode = CommandExitCodes.InvalidInput;
return false;
}
}

return input;
}

if (this.flags.input) {
switch (this.flags.input[0]) {
case '-': {
error({ message: 'You need to pipe something into standard input when you specify the `-` value to `--input`.' });
process.exitCode = CommandExitCodes.InvalidInput;
return false;
}
default: {
try {
input = JSON.parse(this.flags.input);
} catch (err) {
error({ message: `Cannot parse JSON input.\n ${(err as Error).message}` });
process.exitCode = CommandExitCodes.InvalidInput;
return false;
}
}
}
} else if (this.flags.inputFile) {
switch (this.flags.inputFile[0]) {
case '-': {
error({ message: 'You need to pipe something into standard input when you specify the `-` value to `--input-file`.' });
process.exitCode = CommandExitCodes.InvalidInput;
return false;
}
default: {
const fullPath = resolve(cwd, this.flags.inputFile);

try {
const fileContent = await readFile(fullPath, 'utf8');
input = JSON.parse(fileContent);
} catch (err) {
error({ message: `Cannot read input file at path "${fullPath}".\n ${(err as Error).message}` });
process.exitCode = CommandExitCodes.InvalidInput;
return false;
}
}
}
}

return input;
}
}
8 changes: 5 additions & 3 deletions src/lib/apify_command.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { once } from 'node:events';
import process from 'node:process';
import { finished } from 'node:stream/promises';

import { Command, Interfaces, loadHelpClass } from '@oclif/core';

Expand Down Expand Up @@ -80,14 +80,16 @@ export abstract class ApifyCommand<T extends typeof Command> extends Command {
// The isTTY params says if TTY is connected to the process, if so the stdout is
// synchronous and the stdout steam is empty.
// See https://nodejs.org/docs/latest-v12.x/api/process.html#process_a_note_on_process_i_o
if (stdinStream.isTTY) return;
if (stdinStream.isTTY || stdinStream.readableEnded) {
return;
}

const bufferChunks: Buffer[] = [];
stdinStream.on('data', (chunk) => {
bufferChunks.push(chunk);
});

await finished(stdinStream);
await once(stdinStream, 'end');
return Buffer.concat(bufferChunks).toString('utf-8');
}

Expand Down
33 changes: 33 additions & 0 deletions src/lib/commands/resolve-input.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import mime from 'mime';

import { getLocalInput } from '../utils.js';

export function resolveInput(cwd: string, inputOverride: Record<string, unknown> | undefined) {
let inputToUse: Record<string, unknown> | undefined;
let contentType!: string;

if (inputOverride) {
inputToUse = inputOverride;
contentType = 'application/json';
} else {
const localInput = getLocalInput(cwd);

if (localInput) {
const ext = mime.getExtension(localInput.contentType!);

if (ext === 'json') {
inputToUse = JSON.parse(localInput.body.toString('utf8'));
contentType = 'application/json';
} else {
inputToUse = localInput.body as never;
contentType = localInput.contentType!;
}
}
}

if (!inputToUse || !contentType) {
return null;
}

return { inputToUse, contentType };
}
15 changes: 8 additions & 7 deletions src/lib/commands/run-on-cloud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import process from 'node:process';
import { ACTOR_JOB_STATUSES } from '@apify/consts';
import { Flags } from '@oclif/core';
import { ActorRun, ApifyClient, TaskStartOptions } from 'apify-client';
import mime from 'mime';

import { resolveInput } from './resolve-input.js';
import { CommandExitCodes } from '../consts.js';
import { error, link, run as runLog, success, warning } from '../outputs.js';
import { getLocalInput, outputJobLog } from '../utils.js';
import { outputJobLog } from '../utils.js';

export interface RunOnCloudOptions {
actorOrTaskData: {
Expand All @@ -18,6 +18,7 @@ export interface RunOnCloudOptions {
runOptions: TaskStartOptions;
type: 'Actor' | 'Task';
waitForFinishMillis?: number;
inputOverride?: Record<string, unknown>;
}

export async function runActorOrTaskOnCloud(apifyClient: ApifyClient, options: RunOnCloudOptions) {
Expand All @@ -27,12 +28,13 @@ export async function runActorOrTaskOnCloud(apifyClient: ApifyClient, options: R
runOptions,
type,
waitForFinishMillis,
inputOverride,
} = options;

const clientMethod = type === 'Actor' ? 'actor' : 'task';

// Get input for act
const localInput = getLocalInput(cwd);
// Get input for actor
const actorInput = resolveInput(cwd, inputOverride);

if (type === 'Actor') {
runLog({ message: `Calling ${type} ${actorOrTaskData.userFriendlyId} (${actorOrTaskData.id})` });
Expand All @@ -45,12 +47,11 @@ export async function runActorOrTaskOnCloud(apifyClient: ApifyClient, options: R
let run: ActorRun;

try {
if (localInput && type === 'Actor') {
if (actorInput && type === 'Actor') {
// TODO: For some reason we cannot pass json as buffer with right contentType into apify-client.
// It will save malformed JSON which looks like buffer as INPUT.
// We need to fix this in v1 during removing call under Actor namespace.
const input = mime.getExtension(localInput.contentType!) === 'json' ? JSON.parse(localInput.body.toString('utf-8')) : localInput.body;
run = await apifyClient[clientMethod](actorOrTaskData.id).start(input, { ...runOptions, contentType: localInput.contentType! });
run = await apifyClient[clientMethod](actorOrTaskData.id).start(actorInput.inputToUse, { ...runOptions, contentType: actorInput.contentType });
} else {
run = await apifyClient[clientMethod](actorOrTaskData.id).start(undefined, runOptions);
}
Expand Down
30 changes: 0 additions & 30 deletions test/__setup__/hooks/useProcessCwdMock.ts

This file was deleted.

51 changes: 51 additions & 0 deletions test/__setup__/hooks/useProcessMock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { MockSTDIN, stdin as fstdin } from 'mock-stdin';

interface ProcessMockOptions {
cwdMock: () => string;
mockStdin?: boolean;
}

export function useProcessMock({ cwdMock, mockStdin }: ProcessMockOptions) {
let actualStdin: unknown = process.stdin;

if (mockStdin) {
actualStdin = fstdin();
}

vitest.doMock('node:process', async () => {
const actual = await import('node:process');

return {
...actual,
cwd: cwdMock,
stdin: actualStdin,
default: {
...actual,
cwd: cwdMock,
stdin: actualStdin,
},
};
});

vitest.doMock('process', async () => {
const actual = await import('process');

return {
...actual,
cwd: cwdMock,
stdin: actualStdin,
default: {
...actual,
cwd: cwdMock,
stdin: actualStdin,
},
};
});

const processCwdSpy = vitest.spyOn(process, 'cwd');
processCwdSpy.mockImplementation(cwdMock);

return {
stdin: actualStdin as MockSTDIN,
};
}
19 changes: 16 additions & 3 deletions test/__setup__/hooks/useTempPath.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import { mkdir } from 'node:fs/promises';
import { join } from 'node:path';
import { fileURLToPath } from 'node:url';

import { useProcessCwdMock } from './useProcessCwdMock.js';
import { MockSTDIN } from 'mock-stdin';

import { useProcessMock } from './useProcessMock.js';
import { rimrafPromised } from '../../../src/lib/files.js';

export interface UseTempPathOptions {
Expand Down Expand Up @@ -30,20 +32,29 @@ export interface UseTempPathOptions {
* @default false
*/
cwdParent: boolean;

/**
* If true, the stdin will also be mocked.
*/
withStdinMock?: boolean;
}

export function useTempPath(
path: string,
{ create, remove, cwd, cwdParent }: UseTempPathOptions = { create: true, remove: true, cwd: false, cwdParent: false },
{ create, remove, cwd, cwdParent, withStdinMock }: UseTempPathOptions = { create: true, remove: true, cwd: false, cwdParent: false, withStdinMock: false },
) {
const tmpPath = join(fileURLToPath(import.meta.url), '..', '..', '..', 'tmp', path);
const cwdPath = cwdParent ? join(fileURLToPath(import.meta.url), '..', '..', '..', 'tmp') : tmpPath;

let usedCwd = cwdPath;

let mockedStdin = process.stdin as unknown as MockSTDIN;

if (cwd) {
const cwdMock = () => usedCwd;
useProcessCwdMock(cwdMock);

const { stdin } = useProcessMock({ cwdMock, mockStdin: withStdinMock });
mockedStdin = stdin;
}

return {
Expand Down Expand Up @@ -75,5 +86,7 @@ export function useTempPath(
forceNewCwd: (newCwd: string) => {
usedCwd = join(cwdPath, newCwd);
},

stdin: mockedStdin,
};
}
3 changes: 3 additions & 0 deletions test/__setup__/test-data/input-file.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"hello": "world"
}
Loading