Skip to content

Commit 87f0609

Browse files
committed
Migrate fill info retrieval to the new LHC plugin
This commit adds a new call in LHC plugin which updates a given environment with the latest known fill information. Once we validate it, we can proceed with switching off the bkp.RetrieveFillInfo() call. A necessary change in workflow template will have to follow. This approach is better, because we don't rely on a man in the middle, thus reduce risks of errors and getting the required info too late (i.e. not before SOR). Closes OCTRL-1057
1 parent 3949463 commit 87f0609

File tree

3 files changed

+96
-4
lines changed

3 files changed

+96
-4
lines changed

core/environment/transition_startactivity.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ func (t StartActivityTransition) do(env *Environment) (err error) {
8181
// Get a handle to the consolidated var stack of the root role of the env's workflow
8282
if wf := env.Workflow(); wf != nil {
8383
if cvs, cvsErr := wf.ConsolidatedVarStack(); cvsErr == nil {
84-
// If bookkeeping is enabled and has fetched the LHC fill info, we can acquire it here
8584
for _, key := range []string{
8685
"fill_info_fill_number",
8786
"fill_info_filling_scheme",

core/integration/lhc/plugin.go

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"encoding/json"
3030
"errors"
3131
"io"
32+
"strconv"
3233
"strings"
3334
"sync"
3435
"time"
@@ -42,6 +43,7 @@ import (
4243
"github.com/AliceO2Group/Control/core/environment"
4344
"github.com/AliceO2Group/Control/core/integration"
4445
lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event"
46+
"github.com/AliceO2Group/Control/core/workflow/callable"
4547
"github.com/sirupsen/logrus"
4648
"github.com/spf13/viper"
4749
)
@@ -175,8 +177,18 @@ func (p *Plugin) GetEnvironmentsShortData(envIds []uid.ID) map[uid.ID]string {
175177
func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack map[string]interface{}) {
176178
return make(map[string]interface{})
177179
}
178-
func (p *Plugin) CallStack(_ interface{}) (stack map[string]interface{}) {
179-
return make(map[string]interface{})
180+
func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
181+
call, ok := data.(*callable.Call)
182+
if !ok {
183+
return
184+
}
185+
186+
stack = make(map[string]interface{})
187+
stack["UpdateFillInfo"] = func() (out string) {
188+
p.updateFillInfo(call)
189+
return
190+
}
191+
return
180192
}
181193

182194
func (p *Plugin) Destroy() error {
@@ -248,3 +260,83 @@ func (p *Plugin) readAndInjectLhcUpdates() {
248260
}
249261
}
250262
}
263+
264+
// UpdateFillInfo: propagate latest LHC fill info into the environment's global runtime vars
265+
func (p *Plugin) updateFillInfo(call *callable.Call) (out string) {
266+
varStack := call.VarStack
267+
envId, ok := varStack["environment_id"]
268+
if !ok {
269+
err := errors.New("cannot acquire environment ID")
270+
log.Error(err)
271+
272+
call.VarStack["__call_error_reason"] = err.Error()
273+
call.VarStack["__call_error"] = "LHC plugin Call Stack failed"
274+
return
275+
}
276+
277+
log := log.WithFields(logrus.Fields{
278+
"partition": envId,
279+
"call": "UpdateFillInfo",
280+
})
281+
282+
parentRole, ok := call.GetParentRole().(callable.ParentRole)
283+
if !ok || parentRole == nil {
284+
log.WithField(infologger.Level, infologger.IL_Support).
285+
Error("cannot access parent role to propagate LHC fill info")
286+
return
287+
}
288+
289+
if p.currentState == nil {
290+
log.WithField(infologger.Level, infologger.IL_Support).
291+
Warn("attempted to update environment with fill info, but fill info is not available in plugin")
292+
return
293+
}
294+
295+
// note: the following was causing very weird behaviours, which could be attributed to memory corruption.
296+
// I did not manage to understand why can't we safely clone such a proto message.
297+
// state := proto.Clone(p.currentState).(*pb.BeamInfo)
298+
299+
p.mu.Lock()
300+
defer p.mu.Unlock()
301+
state := p.currentState
302+
303+
parentRole.SetGlobalRuntimeVar("fill_info_beam_mode", state.BeamMode.String())
304+
305+
// If NO_BEAM, clear all other fill info and return
306+
if state.BeamMode == pb.BeamMode_NO_BEAM {
307+
parentRole.DeleteGlobalRuntimeVar("fill_info_fill_number")
308+
parentRole.DeleteGlobalRuntimeVar("fill_info_filling_scheme")
309+
parentRole.DeleteGlobalRuntimeVar("fill_info_beam_type")
310+
parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_start_ms")
311+
parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_end_ms")
312+
313+
log.WithField(infologger.Level, infologger.IL_Devel).
314+
Debug("NO_BEAM — cleared fill info vars and set beam mode only")
315+
return
316+
}
317+
318+
// Otherwise, propagate latest known info
319+
parentRole.SetGlobalRuntimeVar("fill_info_fill_number", strconv.FormatInt(int64(state.FillNumber), 10))
320+
parentRole.SetGlobalRuntimeVar("fill_info_filling_scheme", state.FillingSchemeName)
321+
parentRole.SetGlobalRuntimeVar("fill_info_beam_type", state.BeamType)
322+
if state.StableBeamsStart > 0 {
323+
parentRole.SetGlobalRuntimeVar("fill_info_stable_beam_start_ms", strconv.FormatInt(state.StableBeamsStart, 10))
324+
} else {
325+
parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_start_ms")
326+
}
327+
if state.StableBeamsEnd > 0 {
328+
parentRole.SetGlobalRuntimeVar("fill_info_stable_beam_end_ms", strconv.FormatInt(state.StableBeamsEnd, 10))
329+
} else {
330+
parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_end_ms")
331+
}
332+
333+
log.WithField("fillNumber", state.FillNumber).
334+
WithField("fillingScheme", state.FillingSchemeName).
335+
WithField("beamType", state.BeamType).
336+
WithField("beamMode", state.BeamMode).
337+
WithField("stableStartMs", state.StableBeamsStart).
338+
WithField("stableEndMs", state.StableBeamsEnd).
339+
WithField(infologger.Level, infologger.IL_Devel).
340+
Debug("updated environment fill info from latest snapshot")
341+
return
342+
}

docs/handbook/operation_order.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ This is the order of actions happening at a healthy start of run.
3232

3333
- `before_START_ACTIVITY` hooks with negative weights are executed:
3434
- `trg.PrepareForRun()` at `-200`
35+
- `lhc.UpdateFillInfo()` at `-50`
3536
- `"run_number"` is set.
3637
- `"run_start_time_ms"` is set using the current time. It is considered as the SOR and SOSOR timestamps.
3738
- `before_START_ACTIVITY` hooks with positive weights (incl. 0) are executed:
3839
- `trg.RunLoad()`, `bookkeeping.StartOfRun()` at `10`
39-
- `bookkeeping.RetrieveFillInfo()` at `11`
4040
- `kafka.PublishStartActivityUpdate()` at `50`
4141
- `dcs.StartOfRun()`, `odc.Start()` (does not need to return now), `ccdb.RunStart()` at `100`
4242

@@ -72,6 +72,7 @@ This is the order of actions happening at a healthy end of run.
7272
### before_STOP_ACTIVITY
7373

7474
- `before_STOP_ACTIVITY` hooks with negative weights are executed
75+
- `lhc.UpdateFillInfo()` at `-50`
7576
- `trg.RunStop()` at `-10`
7677
- `"run_end_time_ms"` is set using the current time. It is considered as the EOR and SOEOR timestamps.
7778
- `before_STOP_ACTIVITY` hooks with positive weights (incl. 0) are executed:

0 commit comments

Comments
 (0)