Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/async/src/services/asyncjobservice.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ class AsyncJobServiceTest extends WebdaTest {

await action.refresh();
assert.strictEqual(action.status, "ERROR");
assert.strictEqual(action.errorMessage, "WebdaAsyncAction must have method and serviceName defined at least");
assert.strictEqual(action.errorMessage, "Invalid AsyncAction");

// Run without serviceName
await action.patch({
Expand All @@ -457,7 +457,7 @@ class AsyncJobServiceTest extends WebdaTest {
await service.runAsyncOperationAction();
await action.refresh();
assert.strictEqual(action.status, "ERROR");
assert.strictEqual(action.errorMessage, "WebdaAsyncAction must have method and serviceName defined at least");
assert.strictEqual(action.errorMessage, "Invalid AsyncAction");

// Run with unknown service
await action.patch({
Expand Down
103 changes: 72 additions & 31 deletions packages/async/src/services/asyncjobservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@
}

// This is internal job reporting so no need to document the api
this.addRoute(`${this.parameters.url}/status`, ["POST"], this.statusHook, { hidden: true });
this.addRoute(`${this.parameters.url}/status`, ["POST"], this.statusHook, {
hidden: true
});

// Add upload/download route if needed
if (this.parameters.binaryStore) {
Expand Down Expand Up @@ -366,13 +368,19 @@
}
if (selectedRunner === undefined) {
this.log("ERROR", `Cannot find a runner for action ${event.uuid}`);
await this.model.ref(event.uuid).patch({ status: "ERROR", errorMessage: `No runner found for the job` });
await this.model.ref(event.uuid).patch({
status: "ERROR",
errorMessage: `No runner found for the job`
});
return;
}
this.log("INFO", `Starting action ${event.uuid}`);
await this.model.ref(event.uuid).patch({ uuid: event.uuid, status: "STARTING" });
await this.model.ref(event.uuid).patch({
uuid: event.uuid,
status: "STARTING"
});
const action = await this.model.ref(event.uuid).get();
const job = await selectedRunner.launchAction(action, this.getJobInfo(action));
let job = await selectedRunner.launchAction(action, this.getJobInfo(action));
await action.patch({ job }, null);
return job.promise || Promise.resolve();
}
Expand Down Expand Up @@ -402,8 +410,17 @@
context: WebContext<
void,
| {
application: { name: string; version: string };
operations: { [key: string]: { id: string; input?: string; output?: string } };
application: {
name: string;
version: string;
};
operations: {
[key: string]: {
id: string;
input?: string;
output?: string;
};
};
schemas: { [key: string]: JSONSchema7 };
}
| string[]
Expand Down Expand Up @@ -525,7 +542,10 @@
* @returns
*/
getHeaders(jobInfo: JobInfo) {
let res: { [key: string]: string } = { "X-Job-Id": jobInfo.JOB_ID, "X-Job-Time": Date.now().toString() };
let res: { [key: string]: string } = {
"X-Job-Id": jobInfo.JOB_ID,
"X-Job-Time": Date.now().toString()
};
res["X-Job-Hash"] = crypto
.createHmac(AsyncJobService.HMAC_ALGO, jobInfo.JOB_SECRET_KEY)
.update(res["X-Job-Time"])
Expand All @@ -539,7 +559,7 @@
* @param message
* @returns
*/
async postHook(jobInfo: JobInfo, message: any): Promise<AsyncWebdaAction> {
async postHook(jobInfo: JobInfo, message: any): Promise<AsyncAction> {
// Http allow to break paradigm between the executor and the orchestrator
if (jobInfo.JOB_HOOK.startsWith("http")) {
return (
Expand All @@ -549,7 +569,7 @@
).data;
} else if (jobInfo.JOB_HOOK === "store") {
// If executor and orchestrator runs within same privilege it simplify the infrastructure
return <Promise<AsyncWebdaAction>>(await this.model.ref(jobInfo.JOB_ID).get()).update(message);
return <Promise<AsyncAction>>(await this.model.ref(jobInfo.JOB_ID).get()).update(message);
}
}

Expand Down Expand Up @@ -606,36 +626,57 @@
this.log("DEBUG", "Getting action to execute from hook", jobInfo);
// Get action info by calling the hook
let action = await this.postHook(jobInfo, {
agent: { ...Runner.getAgentInfo(), nodeVersion: process.version },
agent: {
...Runner.getAgentInfo(),
nodeVersion: process.version
},
status: "RUNNING"
});
this.log("DEBUG", "Action received", action.serviceName, action.method, action.arguments);
this.log("DEBUG", "Action received", action);
let results;
try {
// Check it contains the right info
if (!action.method || !action.serviceName) {
throw new Error("WebdaAsyncAction must have method and serviceName defined at least");
}
// Call the service[method](...args)
let service = this.getService(action.serviceName);
if (!service) {
throw new Error(`WebdaAsyncAction Service '${action.serviceName}' not found: mismatch app version`);
}
// @ts-ignore
if (!service[action.method]) {
throw new Error(
`WebdaAsyncAction Method '${action.method}' not found in service ${action.serviceName}: mismatch app version`
);
if ((action as AsyncWebdaAction).method && (action as AsyncWebdaAction).serviceName) {
const webdaAction = action as AsyncWebdaAction;
// Call the service[method](...args)
let service = this.getService(webdaAction.serviceName);
if (!service) {
throw new Error(`WebdaAsyncAction Service '${webdaAction.serviceName}' not found: mismatch app version`);
}
// @ts-ignore
if (!service[webdaAction.method]) {
throw new Error(
`WebdaAsyncAction Method '${webdaAction.method}' not found in service ${webdaAction.serviceName}: mismatch app version`
);
}
// @ts-ignore
results = await service[webdaAction.method](...(webdaAction.arguments || []));
} else if ((action as AsyncOperationAction).operationId) {
const operationAction = action as AsyncOperationAction;
// Call the operation
let ctx: SimpleOperationContext = new SimpleOperationContext(this.getWebda());
// Unserialization might not have happened
ctx.setSession(operationAction.context.getSession ? operationAction.context.getSession() : operationAction.context["session"]);
// Unserialization might not have happened
ctx.setInput(Buffer.from(operationAction.context["input"]?.data || []));
results = await this.callOperation(ctx);

Check warning on line 662 in packages/async/src/services/asyncjobservice.ts

View check run for this annotation

Codecov / codecov/patch

packages/async/src/services/asyncjobservice.ts#L655-L662

Added lines #L655 - L662 were not covered by tests
} else {
this.log("ERROR", "Invalid AsyncAction", action);
throw new Error(`Invalid AsyncAction`);
}
// @ts-ignore
results = await service[action.method](...(action.arguments || []));
} catch (err) {
// Job is in error
await this.postHook(jobInfo, { errorMessage: <string | undefined>err?.message, status: "ERROR" });
await this.postHook(jobInfo, {
errorMessage: <string | undefined>err?.message,
status: "ERROR"
});
return;
}
// Update status
await this.postHook(jobInfo, { results, status: "SUCCESS" });
await this.postHook(jobInfo, {
results,
status: "SUCCESS"
});
}

/**
Expand Down Expand Up @@ -679,9 +720,9 @@
time -= time % this.parameters.schedulerResolution;
// Queue all actions
await Promise.all(
(
await this.model.query(`status = 'SCHEDULED' AND scheduled < ${time + 1}`)
).results.map(a => this.launchAction(a))
(await this.model.query(`status = 'SCHEDULED' AND scheduled < ${time + 1}`)).results.map(a =>
this.launchAction(a)
)
);
time += this.parameters.schedulerResolution;
// Wait for next scheduler resolution
Expand Down
Loading