Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,497 changes: 1,014 additions & 1,483 deletions common/config/rush/pnpm-lock.yaml

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion common/scripts/docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ else
--to @hcengineering/pod-billing \
--to @hcengineering/pod-process \
--to @hcengineering/pod-rating \
--to @hcengineering/pod-payment
--to @hcengineering/pod-payment \
--to @hcengineering/pod-worker
fi
18 changes: 17 additions & 1 deletion dev/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ services:
- --mode dev-container
- --smp 1
- --default-log-level=info
- --memory 256M
- --memory 512M
container_name: redpanda
volumes:
- redpanda:/var/lib/redpanda/data
Expand Down Expand Up @@ -594,6 +594,22 @@ services:
- QUEUE_CONFIG=${QUEUE_CONFIG}
- QUEUE_REGION=cockroach
restart: unless-stopped
time-machine:
image: hardcoreeng/worker
extra_hosts:
- 'huly.local:host-gateway'
depends_on:
redpanda:
condition: service_started
account:
condition: service_started
cockroach:
condition: service_started
environment:
- DB_URL=${DB_CR_URL}
- QUEUE_CONFIG=${QUEUE_CONFIG}
- QUEUE_REGION=cockroach
restart: unless-stopped
# translate:
# image: hardcoreeng/translate
# extra_hosts:
Expand Down
4 changes: 3 additions & 1 deletion foundations/server/packages/core/src/queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export enum QueueTopic {
CalendarEventCUD = 'calendarEventCUD',

// A topic about process events
Process = 'process'
Process = 'process',

TimeMachine = 'timeMachine'
}

export interface ConsumerHandle {
Expand Down
10 changes: 9 additions & 1 deletion server-plugins/process/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Card } from '@hcengineering/card'
import { CollaboratorClient } from '@hcengineering/collaborator-client'
import { Doc, MeasureContext, PersonId, Ref, Timestamp, Tx, TxOperations, WorkspaceUuid } from '@hcengineering/core'
import { Execution, ExecutionError, MethodParams, Trigger, UserResult } from '@hcengineering/process'
import { CollaboratorClient } from '@hcengineering/collaborator-client'

export type ExecuteFunc = (
params: MethodParams<Doc>,
Expand Down Expand Up @@ -51,3 +51,11 @@ export interface ProcessControl {
}

export type RollbackFunc = (context: Record<string, any>, control: ProcessControl) => Tx

export interface TimeMachineMessage {
type: 'schedule' | 'cancel'
id: string
targetDate?: Timestamp
topic?: string
data?: any
}
1 change: 0 additions & 1 deletion services/process/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
"@hcengineering/account-client": "workspace:^0.7.21",
"@hcengineering/communication-types": "workspace:^0.7.12",
"@hcengineering/communication-sdk-types": "workspace:^0.7.12",
"@temporalio/client": "1.12.3",
"dotenv": "^16.4.5"
}
}
2 changes: 0 additions & 2 deletions services/process/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import { join } from 'path'
import config from './config'
import { prepare } from './init'
import { messageHandler } from './main'
import { closeTemporal } from './temporal'
import { SERVICE_NAME } from './utils'

async function main (): Promise<void> {
Expand Down Expand Up @@ -63,7 +62,6 @@ async function main (): Promise<void> {
)

const shutdown = (): void => {
void closeTemporal()
void Promise.all([consumer.close()]).then(() => {
process.exit()
})
Expand Down
83 changes: 38 additions & 45 deletions services/process/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
//

import cardPlugin, { Card } from '@hcengineering/card'
import { CreateMessageEvent, MessageEventType } from '@hcengineering/communication-sdk-types'
import { ActivityProcess, ActivityUpdateType, MessageType } from '@hcengineering/communication-types'
import core, {
Doc,
generateId,
Expand All @@ -30,6 +32,7 @@ import core, {
TxUpdateDoc,
WorkspaceUuid
} from '@hcengineering/core'
import { getPlatformQueue } from '@hcengineering/kafka'
import { getResource } from '@hcengineering/platform'
import process, {
Execution,
Expand All @@ -49,22 +52,19 @@ import process, {
Trigger,
UserResult
} from '@hcengineering/process'
import { QueueTopic } from '@hcengineering/server-core'
import serverProcess, {
ExecuteResult,
MethodImpl,
ProcessControl,
ProcessMessage,
TimeMachineMessage,
TriggerImpl
} from '@hcengineering/server-process'
import { getContextValue } from '@hcengineering/server-process-resources'
import { Client as TemporalClient } from '@temporalio/client'
import config from './config'
import { isError } from './errors'
import { getTemporalClient } from './temporal'
import { getClient, releaseClient } from './utils'
import { CreateMessageEvent, MessageEventType } from '@hcengineering/communication-sdk-types'
import { ActivityUpdateType, ActivityProcess, MessageType } from '@hcengineering/communication-types'
import { createCollaboratorClient } from './collaborator'
import { isError } from './errors'
import { getClient, releaseClient, SERVICE_NAME } from './utils'

const activeExecutions = new Set<Ref<Execution>>()

Expand Down Expand Up @@ -608,9 +608,8 @@ async function updateExecutionTimers (control: ProcessControl, execution: Execut
trigger: process.trigger.OnTime
})
if (transitions.length === 0) return
const temporalClient = await getTemporalClient()
for (const transition of transitions) {
await setTimer(control, execution, transition, temporalClient)
await setTimer(control, execution, transition)
}
} catch (err) {
control.ctx.error('Error setting next timers:', { error: err, execution: execution._id })
Expand All @@ -619,66 +618,60 @@ async function updateExecutionTimers (control: ProcessControl, execution: Execut

async function setNextTimers (control: ProcessControl, execution: Execution): Promise<void> {
try {
const temporalClient = await getTemporalClient()
await cleanTimers(execution, temporalClient)
await cleanTimers(control, execution)
const transitions = control.client.getModel().findAllSync(process.class.Transition, {
from: execution.currentState,
process: execution.process,
trigger: process.trigger.OnTime
})
for (const transition of transitions) {
await setTimer(control, execution, transition, temporalClient)
await setTimer(control, execution, transition)
}
} catch (err) {
control.ctx.error('Error setting next timers:', { error: err, execution: execution._id })
}
}

async function cleanTimers (execution: Execution, temporalClient: TemporalClient): Promise<void> {
async function cleanTimers (control: ProcessControl, execution: Execution): Promise<void> {
try {
const res = await temporalClient.workflowService.listWorkflowExecutions({
namespace: config.TemporalNamespace,
query: `WorkflowType="processTimeWorkflow" AND ExecutionStatus="Running" AND ProcessExecution="${execution._id}"`
})

for (const ex of res.executions) {
try {
await temporalClient.workflowService.terminateWorkflowExecution({
workflowExecution: {
workflowId: ex.execution?.workflowId,
runId: ex.execution?.runId
},
reason: 'Outdated'
})
} catch (err) {
console.error('Error terminating workflow execution:', err)
const queue = getPlatformQueue(SERVICE_NAME)
const producer = queue.getProducer<TimeMachineMessage>(control.ctx, QueueTopic.TimeMachine)
await producer.send(control.ctx, control.workspace, [
{
type: 'cancel',
id: `${execution._id}_%`
}
}
])
} catch (err) {
console.error('Error cleaning timers:', err)
}
}

async function setTimer (
control: ProcessControl,
execution: Execution,
transition: Transition,
temporalClient: TemporalClient
): Promise<void> {
async function setTimer (control: ProcessControl, execution: Execution, transition: Transition): Promise<void> {
const filled = await fillParams(transition.triggerParams, execution, control)
const targetDate: number = filled.value
if (targetDate === undefined || typeof targetDate !== 'number' || targetDate === 0 || Number.isNaN(targetDate)) return
try {
await temporalClient.workflow.signalWithStart('processTimeWorkflow', {
taskQueue: 'process',
signal: 'setDate',
args: [targetDate, control.workspace, execution._id],
signalArgs: [targetDate],
workflowId: `${execution._id}_${transition._id}`,
searchAttributes: {
ProcessExecution: [execution._id]
const queue = getPlatformQueue(SERVICE_NAME)
const producer = queue.getProducer<TimeMachineMessage>(control.ctx, QueueTopic.TimeMachine)

const data: ProcessMessage = {
account: core.account.System,
event: [process.trigger.OnTime],
createdOn: Date.now(),
context: {},
execution: execution._id
}

await producer.send(control.ctx, control.workspace, [
{
type: 'schedule',
id: `${execution._id}_${transition._id}`,
targetDate,
topic: QueueTopic.Process,
data
}
})
])
} catch (e) {
console.error('Error setting timer:', e)
}
Expand Down
41 changes: 0 additions & 41 deletions services/process/src/temporal.ts

This file was deleted.

11 changes: 1 addition & 10 deletions services/worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
FROM node:20-bullseye-slim

RUN apt-get update \
&& apt-get install -y ca-certificates \
&& rm -rf /var/lib/apt/lists/*

FROM hardcoreeng/base-slim:v20250916
WORKDIR /usr/src/app

ENV NODE_ENV=production
RUN npm install --ignore-scripts=false --verbose @temporalio/worker @temporalio/workflow --unsafe-perm

COPY bundle/bundle.js ./
COPY bundle/workflows.js ./

CMD [ "node", "bundle.js" ]
44 changes: 44 additions & 0 deletions services/worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Time Machine Service

The Time Machine service is an autonomous, generic service responsible for handling delayed events (timers). It replaces the previous Temporal implementation with a database-backed polling mechanism and Kafka-based communication.

## How it works

1. **Commands**: The service consumes commands from the `TimeMachine` Kafka topic.
2. **Storage**: Scheduled events are stored in a PostgreSQL table `time_machine.delayed_events`.
3. **Polling**: The service periodically polls the database for expired events.
4. **Events**: When an event expires, the service sends the stored `data` to the specified `topic` via Kafka and removes the record from its database.

## Kafka Interactions

### Consumed (Incoming)
**Topic**: `TimeMachine` (`timeMachine`)
**Message Type**: `TimeMachineMessage`

| Type | Description |
| :--- | :--- |
| `schedule` | Schedules a new timer or updates an existing one. Requires `id`, `targetDate`, `topic`, and `data`. |
| `cancel` | Removes scheduled timers. The `id` supports pattern matching via `ILIKE` (e.g., `prefix_%`). |

### Produced (Outgoing)
**Topic**: Dynamic (specified in `schedule` command)
**Message Type**: Arbitrary JSON (stored in `data`)

When a timer expires, the service relays the exact `data` payload to the target `topic`.

## Environment Variables

| Variable | Default | Description |
| :--- | :--- | :--- |
| `DB_URL` | `postgres://localhost:5432/huly` | Connection string for the PostgreSQL database. |
| `POLL_INTERVAL` | `5000` | Polling interval for expired events in milliseconds. |
| `QUEUE_CONFIG` | - | Kafka bootstrap servers configuration. |
| `QUEUE_REGION` | `cockroach` | Platform region configuration. |

## Database Schema

The service automatically initializes its own schema if it doesn't exist:
- **Schema**: `time_machine`
- **Table**: `delayed_events`
- **Columns**: `id` (text), `workspace` (uuid), `target_date` (int8), `topic` (text), `data` (jsonb).
- **Primary Key**: `(id, workspace)`
26 changes: 0 additions & 26 deletions services/worker/esbuild.js

This file was deleted.

Loading
Loading