diff --git a/plugins/process-resources/src/utils.ts b/plugins/process-resources/src/utils.ts index 9fcf57024c7..b1fd6683c70 100644 --- a/plugins/process-resources/src/utils.ts +++ b/plugins/process-resources/src/utils.ts @@ -449,7 +449,7 @@ export async function requestUserInput ( userContext = { ...userContext, ...tr } changed = true } - const sub = await getSubProcessesUserInput(space, target, userContext) + const sub = await getSubProcessesUserInput(execution, space, target, userContext) if (sub !== undefined) { userContext = { ...userContext, ...sub } changed = true @@ -543,6 +543,7 @@ function getEmptyContext (): ExecutionContext { } export async function getSubProcessesUserInput ( + execution: Execution, space: Ref, transition: Transition, userContext: ExecutionContext @@ -552,8 +553,28 @@ export async function getSubProcessesUserInput ( if (action.methodId !== process.method.RunSubProcess) continue const processId = action.params._id as Ref if (processId === undefined) continue - const context = action.params.context ?? getEmptyContext() - const res = await newExecutionUserInput(processId, space, context) + const context: ExecutionContext = action.params.context ?? getEmptyContext() + const initTransition = getClient().getModel().findAllSync(process.class.Transition, { + process: processId, + from: null + })[0] + if (initTransition === undefined) continue + const card = action.params.card ?? execution.card + const client = getClient() + const mockExecution: Execution = { + _id: generateId(), + process: processId, + currentState: initTransition.to, + card, + rollback: [], + context, + status: ExecutionStatus.Active, + space, + _class: process.class.Execution, + modifiedOn: 0, + modifiedBy: client.user + } + const res = await newExecutionUserInput(processId, space, mockExecution) if (action.context == null || res === undefined) continue userContext[action.context._id] = res changed = true diff --git a/server-plugins/process-resources/src/functions.ts b/server-plugins/process-resources/src/functions.ts index 7b8bf73c735..46be631c0ab 100644 --- a/server-plugins/process-resources/src/functions.ts +++ b/server-plugins/process-resources/src/functions.ts @@ -424,12 +424,6 @@ export async function RunSubProcess ( } } - // check card is exists - const exists = await control.client.findOne(cardPlugin.class.Card, { _id: _card }) - if (exists === undefined) { - throw processError(process.error.ObjectNotFound, { _id: _card }) - } - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions const context = params.context ?? ({} as ExecutionContext) const _id = generateId() diff --git a/server-plugins/process-resources/src/index.ts b/server-plugins/process-resources/src/index.ts index 2b651e96d3d..3d4ad2fef61 100644 --- a/server-plugins/process-resources/src/index.ts +++ b/server-plugins/process-resources/src/index.ts @@ -157,6 +157,7 @@ export async function OnProcessToDoClose (txes: Tx[], control: TriggerControl): event: events, execution: todo.execution, createdOn: tx.modifiedOn, + _id: tx._id, context: { todo } @@ -194,6 +195,7 @@ export async function OnCustomEvent (txes: Tx[], control: TriggerControl): Promi execution: customEvent.execution, createdOn: tx.modifiedOn, card: customEvent.card, + _id: tx._id, context: { eventType: customEvent.eventType, card @@ -216,6 +218,7 @@ export async function OnExecutionCreate (txes: Tx[], control: TriggerControl): P event: [process.trigger.OnExecutionStart], execution: execution._id, createdOn: tx.modifiedOn, + _id: tx._id, context: {} }, control @@ -237,6 +240,7 @@ export async function OnProcessToDoRemove (txes: Tx[], control: TriggerControl): event: [process.trigger.OnToDoRemove], execution: removedTodo.execution, createdOn: tx.modifiedOn, + _id: tx._id, context: { todo: removedTodo } @@ -266,6 +270,7 @@ export async function OnExecutionContinue (txes: Tx[], control: TriggerControl): event: [process.trigger.OnExecutionContinue], execution: execution._id, createdOn: tx.modifiedOn, + _id: tx._id, context: {} }, control @@ -476,6 +481,7 @@ export async function OnCardUpdate (txes: Tx[], control: TriggerControl): Promis event: [process.trigger.OnCardUpdate, process.trigger.WhenFieldChanges], card: cudTx.objectId, createdOn: tx.modifiedOn, + _id: tx._id, context: { card: card[0], operations: ops ?? {} diff --git a/server-plugins/process/src/types.ts b/server-plugins/process/src/types.ts index 3393ad474ee..29eef3325dc 100644 --- a/server-plugins/process/src/types.ts +++ b/server-plugins/process/src/types.ts @@ -31,6 +31,7 @@ export type TransformFunc = ( ) => Promise export interface ProcessMessage { + _id?: string account: PersonId createdOn: Timestamp event: Ref[] diff --git a/services/process/src/main.ts b/services/process/src/main.ts index fd76474a505..d1dc33fa3b6 100644 --- a/services/process/src/main.ts +++ b/services/process/src/main.ts @@ -68,9 +68,24 @@ import { getClient, releaseClient, SERVICE_NAME } from './utils' import config from './config' const activeExecutions = new Set>() +const processedMessages = new Map() +const MAX_PROCESSED_MESSAGES = 1000 export async function messageHandler (record: ProcessMessage, ws: WorkspaceUuid, ctx: MeasureContext): Promise { if (record.account === core.account.ConfigUser) return + if (record._id !== undefined) { + if (processedMessages.has(record._id)) { + ctx.info('Skipping duplicate message', { _id: record._id, ws, record }) + return + } + processedMessages.set(record._id, Date.now()) + if (processedMessages.size > MAX_PROCESSED_MESSAGES) { + const first = processedMessages.keys().next().value + if (first !== undefined) { + processedMessages.delete(first) + } + } + } try { const client = new TxOperations(await getClient(ws), record.account) try { @@ -658,6 +673,7 @@ async function setTimer (control: ProcessControl, execution: Execution, transiti const producer = queue.getProducer(control.ctx, QueueTopic.TimeMachine) const data: ProcessMessage = { + _id: `${execution._id}_${transition._id}`, account: core.account.System, event: [process.trigger.OnTime], createdOn: Date.now(),