Skip to content

Commit 7c50c94

Browse files
authored
[OGUI-1554]DCS SOR panel (#2601)
1 parent f174ea3 commit 7c50c94

File tree

15 files changed

+481
-7
lines changed

15 files changed

+481
-7
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
14+
/**
15+
* DcsIntegratedEventAdapter - Given an AliECS Integrated Service Event for DCS.SOR, build a DCS Integrated Event
16+
*
17+
* The DCS SOR event is a special event that comes from either:
18+
* * the DCS service itself (when containing the payload "dcsEvent") and it is for one detector only
19+
* * the ECS service which describes steps being executed for all detectors involved
20+
*/
21+
class DcsIntegratedEventAdapter {
22+
/**
23+
* DcsIntegratedEventAdapter
24+
*/
25+
constructor() {
26+
}
27+
28+
/**
29+
* Build a DCS Integrated Event from an AliECS Integrated Service Event. If it is a DCSevent, the detector will replace detectors array
30+
* @param {object} event - AliECS Integrated Service Event
31+
* @param {number} timestamp - timestamp of the event (int64 as per proto file definition)
32+
* @return {object} DCS Integrated Event
33+
*/
34+
static buildDcsIntegratedEvent(event, timestamp) {
35+
const { name, error, environmentId, payload } = event;
36+
const { operationName, operationStatus, operationStep, operationStepStatus } = event;
37+
38+
const payloadJSON = JSON.parse(payload);
39+
const { dcsEvent, runNumber, detector = null, state } = payloadJSON;
40+
if (!dcsEvent) {
41+
return null;
42+
}
43+
let { detectors } = payloadJSON;
44+
45+
if (detector) {
46+
// event comes with information also from DCS and it comes per detector for SOR so we override detectors
47+
detectors = [detector];
48+
}
49+
50+
return {
51+
name,
52+
timestamp: Number(timestamp),
53+
error,
54+
environmentId,
55+
runNumber,
56+
state,
57+
operationName,
58+
operationStatus,
59+
operationStep,
60+
operationStepStatus,
61+
detectors
62+
};
63+
}
64+
}
65+
66+
exports.DcsIntegratedEventAdapter = DcsIntegratedEventAdapter;

Control/lib/api.js

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* or submit itself to any jurisdiction.
1313
*/
1414

15+
const { Kafka, logLevel } = require('kafkajs');
1516
const logger = (require('@aliceo2/web-ui').LogManager)
1617
.getLogger(`${process.env.npm_config_log_label ?? 'cog'}/api`);
1718
const config = require('./config/configProvider.js');
@@ -45,6 +46,7 @@ const {WorkflowTemplateService} = require('./services/WorkflowTemplate.service.j
4546
const {NotificationService, ConsulService} = require('@aliceo2/web-ui');
4647

4748
// AliECS Core
49+
const { AliEcsSynchronizer } = require('./control-core/AliEcsSynchronizer.js');
4850
const AliecsRequestHandler = require('./control-core/RequestHandler.js');
4951
const ApricotService = require('./control-core/ApricotService.js');
5052
const ControlService = require('./control-core/ControlService.js');
@@ -100,19 +102,32 @@ module.exports.setup = (http, ws) => {
100102
aliecsReqHandler.setWs(ws);
101103
aliecsReqHandler.workflowService = workflowService;
102104

103-
const envCache = new EnvCache(ctrlService, envService);
105+
const envCache = new EnvCache(ctrlService, envService, cacheService);
104106
envCache.setWs(ws);
105107

106108
const bkpService = new BookkeepingService(config.bookkeeping ?? {});
107109
const runService = new RunService(bkpService, apricotService, cacheService);
108110
runService.retrieveStaticConfigurations();
109111
const runController = new RunController(runService, cacheService);
110112

111-
const notificationService = new NotificationService(config.kafka);
113+
const notificationService = new NotificationService();
112114
if (notificationService.isConfigured()) {
113115
notificationService.proxyWebNotificationToWs(ws);
114116
}
115117

118+
let aliEcsSynchronizer = undefined;
119+
if (config.kafka && config.kafka?.enable) {
120+
const kafkaClient = new Kafka({
121+
clientId: 'control-gui',
122+
brokers: config.kafka.brokers,
123+
retry: { retries: 3 },
124+
logLevel: logLevel.NOTHING,
125+
});
126+
127+
aliEcsSynchronizer = new AliEcsSynchronizer(kafkaClient, cacheService);
128+
aliEcsSynchronizer.start();
129+
}
130+
116131
const statusService = new StatusService(
117132
config, ctrlService, consulService, apricotService, notificationService, wsService,
118133
);

Control/lib/common/cacheKeys.enum.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
const CacheKeys = Object.freeze({
1919
CALIBRATION_RUNS_BY_DETECTOR: 'CALIBRATION_RUNS_BY_DETECTOR',
2020
CALIBRATION_RUNS_REQUESTS: 'CALIBRATION_RUNS_REQUESTS',
21+
DCS: {
22+
SOR: 'DCS.SOR',
23+
}
2124
});
2225

2326
exports.CacheKeys = CacheKeys;
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
const protobuf = require('protobufjs');
14+
const path = require('node:path');
15+
const { LogManager } = require('@aliceo2/web-ui');
16+
17+
const protoDir = path.resolve(__dirname, './../../protobuf/protos');
18+
const root = protobuf.loadSync(path.resolve(protoDir, 'events.proto'));
19+
const EventMessage = root.lookupType('events.Event');
20+
21+
/**
22+
* @callback MessageReceivedCallback
23+
* @param {EventMessage} message received message
24+
* @return {Promise<void>}
25+
*/
26+
27+
/**
28+
* Consumer that consume ECS event messages and pass them to previously-registered listeners
29+
* @author Martin Boulais <mboulais@cern.ch>
30+
* Until consumer is added in the common library, consumer was extracted from:
31+
* - https://github.com/AliceO2Group/Bookkeeping/blob/main/lib/server/kafka/AliEcsEventMessagesConsumer.js
32+
*/
33+
class AliEcsEventMessagesConsumer {
34+
/**
35+
* Constructor
36+
*
37+
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
38+
* @param {string} groupId the group id to use for the kafka consumer
39+
* @param {string[]} topics the list of topics to consume
40+
*/
41+
constructor(kafkaClient, groupId, topics) {
42+
this.consumer = kafkaClient.consumer({ groupId });
43+
this._topics = topics;
44+
45+
/**
46+
* @type {MessageReceivedCallback[]}
47+
* @private
48+
*/
49+
this._listeners = [];
50+
51+
this._logger = LogManager.getLogger('cog/ecs-event-consumer');
52+
}
53+
54+
/**
55+
* Register a listener to listen on event message being received
56+
*
57+
* Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged
58+
*
59+
* @param {MessageReceivedCallback} listener the listener to register
60+
* @return {void}
61+
*/
62+
onMessageReceived(listener) {
63+
this._listeners.push(listener);
64+
}
65+
66+
/**
67+
* Start the kafka consumer
68+
*
69+
* @return {Promise<void>} Resolves once the consumer started to consume messages
70+
*/
71+
async start() {
72+
this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`);
73+
await this.consumer.connect();
74+
await this.consumer.subscribe({ topics: this._topics });
75+
await this.consumer.run({
76+
eachMessage: async ({ message, topic }) => {
77+
const error = EventMessage.verify(message.value);
78+
if (error) {
79+
this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`);
80+
return;
81+
}
82+
await this._handleEvent(
83+
EventMessage.toObject(
84+
EventMessage.decode(message.value),
85+
{ enums: String },
86+
)
87+
);
88+
},
89+
});
90+
}
91+
92+
/**
93+
* Call every registered listeners by passing the given message to it
94+
*
95+
* @param {EventMessage} message the message to pass to listeners
96+
* @return {void}
97+
*/
98+
async _handleEvent(message) {
99+
for (const listener of this._listeners) {
100+
try {
101+
await listener(message);
102+
} catch (error) {
103+
this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`);
104+
}
105+
}
106+
}
107+
}
108+
109+
exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
14+
const { AliEcsEventMessagesConsumer } = require('./AliEcsEventMessagesConsumer.js');
15+
const { DcsIntegratedEventAdapter } = require('../adapters/DcsIntegratedEventAdapter.js');
16+
const { CacheKeys } = require('../common/cacheKeys.enum.js');
17+
const { LogManager } = require('@aliceo2/web-ui');
18+
19+
const INTEGRATED_SERVICES_CONSUMER_GROUP = 'cog-integrated-services';
20+
const INTEGRATED_SERVICES_TOPICS = ['aliecs.integrated_service.dcs'];
21+
const SOR_EVENT_NAME = 'readout-dataflow.dcs.sor';
22+
23+
/**
24+
* Utility synchronizing AliECS data into control-gui, listening to kafka
25+
*/
26+
class AliEcsSynchronizer {
27+
/**
28+
* Constructor
29+
*
30+
* @param {import('kafkajs').Kafka} kafkaClient - configured kafka client
31+
* @param {CacheService} cacheService - instance of CacheService
32+
*/
33+
constructor(kafkaClient, cacheService) {
34+
this._cacheService = cacheService;
35+
this._logger = LogManager.getLogger('cog/ali-ecs-synchronizer');
36+
37+
this._ecsIntegratedServiceConsumer = new AliEcsEventMessagesConsumer(
38+
kafkaClient,
39+
INTEGRATED_SERVICES_CONSUMER_GROUP,
40+
INTEGRATED_SERVICES_TOPICS
41+
);
42+
this._ecsIntegratedServiceConsumer.onMessageReceived(async (eventMessage) => {
43+
const { timestamp, integratedServiceEvent } = eventMessage;
44+
try {
45+
if (integratedServiceEvent.name === SOR_EVENT_NAME) {
46+
const dcsSorEvent = DcsIntegratedEventAdapter.buildDcsIntegratedEvent(integratedServiceEvent, timestamp);
47+
if (!dcsSorEvent) {
48+
return;
49+
}
50+
const { environmentId } = dcsSorEvent;
51+
let cachedDcsSteps = this._cacheService.getByKey(CacheKeys.DCS.SOR);
52+
if (!cachedDcsSteps) {
53+
cachedDcsSteps = {};
54+
}
55+
if (!cachedDcsSteps?.[environmentId]) {
56+
cachedDcsSteps[environmentId] = {
57+
displayCache: true,
58+
dcsOperations: [dcsSorEvent]
59+
};
60+
} else {
61+
cachedDcsSteps[environmentId].dcsOperations.push(dcsSorEvent);
62+
}
63+
cachedDcsSteps[environmentId].dcsOperations.sort((a, b) => a.timestamp - b.timestamp);
64+
this._cacheService.updateByKeyAndBroadcast(CacheKeys.DCS.SOR, cachedDcsSteps, {command: CacheKeys.DCS.SOR});
65+
}
66+
} catch (error) {
67+
this._logger.errorMessage(`Error when parsing event message: ${error.message}\n${error.trace}`);
68+
}
69+
});
70+
}
71+
72+
/**
73+
* Start the synchronization process
74+
*
75+
* @return {void}
76+
*/
77+
start() {
78+
this._logger.infoMessage('Starting to consume AliECS messages for integrated services');
79+
this._ecsIntegratedServiceConsumer
80+
.start()
81+
.catch((error) =>
82+
this._logger.errorMessage(
83+
`Error when starting ECS integrated services consumer: ${error.message}\n${error.trace}`
84+
)
85+
);
86+
}
87+
}
88+
89+
exports.AliEcsSynchronizer = AliEcsSynchronizer;

Control/lib/control-core/EnvCache.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
const {WebSocketMessage, LogManager} = require('@aliceo2/web-ui');
1616
const logger = LogManager.getLogger(`${process.env.npm_config_log_label ?? 'cog'}/envcache`);
1717
const assert = require('assert');
18+
const { CacheKeys } = require('../common/cacheKeys.enum.js');
1819

1920
/**
2021
* Caches AliECS core GetEnvironments response
@@ -24,9 +25,11 @@ class EnvCache {
2425
/**
2526
* @param {object} ctrlService - Handle to Control service
2627
* @param {EnvironmentService} environmentService - service to be used to retrieve information on environments
28+
* @param {CacheService} cacheService - service to be used to retrieve information from cache
2729
*/
28-
constructor(ctrlService, environmentService) {
30+
constructor(ctrlService, environmentService, cacheService) {
2931
this.ctrlService = ctrlService;
32+
this._cacheService = cacheService;
3033
this.cache = {};
3134
this.timeout = 9000;
3235
this.cacheEvictionTimeout = 5 * 60 * 1000;
@@ -88,6 +91,14 @@ class EnvCache {
8891
for (let [index, currentEnv] of envs.environments.entries()) {
8992
try {
9093
const environment = await this._environmentService.getEnvironment(currentEnv.id);
94+
if (environment.state === 'RUNNING' && !environment.currentTransition) {
95+
// if environment reached a stable running state, hide the display cache SOR information from future ERROR states
96+
const dcsCache = this._cacheService.getByKey(CacheKeys.DCS.SOR);
97+
if (dcsCache?.[environment.id]) {
98+
dcsCache[environment.id].displayCache = false;
99+
}
100+
this._cacheService.updateByKeyAndBroadcast(CacheKeys.DCS.SOR, dcsCache,{command: CacheKeys.DCS.SOR});
101+
}
91102
envs.environments[index] = environment;
92103
this._updateCache(envs);
93104
} catch (error) {

Control/package-lock.json

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Control/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
"@aliceo2/web-ui": "2.7.1",
3737
"@grpc/grpc-js": "1.12.0",
3838
"@grpc/proto-loader": "0.7.0",
39-
"google-protobuf": "3.21.0"
39+
"google-protobuf": "3.21.0",
40+
"kafkajs": "2.2.4"
4041
},
4142
"bundledDependencies": [
4243
"@aliceo2/web-ui",

0 commit comments

Comments
 (0)