Skip to content

Commit 334d5e6

Browse files
committed
Migrate fill info retrieval to the new LHC plugin
This commit adds a new call in LHC plugin which updates a given environment with the latest known fill information. Once we validate it, we can proceed with switching off the bkp.RetrieveFillInfo() call. A necessary change in workflow template will have to follow. This approach is better, because we don't rely on a man in the middle, thus reduce risks of errors and getting the required info too late (i.e. not before SOR). Closes OCTRL-1057
1 parent 6a97d97 commit 334d5e6

File tree

3 files changed

+99
-7
lines changed

3 files changed

+99
-7
lines changed

core/environment/transition_startactivity.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ func (t StartActivityTransition) do(env *Environment) (err error) {
8181
// Get a handle to the consolidated var stack of the root role of the env's workflow
8282
if wf := env.Workflow(); wf != nil {
8383
if cvs, cvsErr := wf.ConsolidatedVarStack(); cvsErr == nil {
84-
// If bookkeeping is enabled and has fetched the LHC fill info, we can acquire it here
8584
for _, key := range []string{
8685
"fill_info_fill_number",
8786
"fill_info_filling_scheme",

core/integration/lhc/plugin.go

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,22 @@ import (
2828
"context"
2929
"encoding/json"
3030
"errors"
31-
"github.com/AliceO2Group/Control/common/event/topic"
32-
"github.com/AliceO2Group/Control/common/logger/infologger"
33-
pb "github.com/AliceO2Group/Control/common/protos"
3431
"io"
32+
"strconv"
3533
"strings"
3634
"sync"
3735
"time"
3836

3937
cmnevent "github.com/AliceO2Group/Control/common/event"
38+
"github.com/AliceO2Group/Control/common/event/topic"
4039
"github.com/AliceO2Group/Control/common/logger"
40+
"github.com/AliceO2Group/Control/common/logger/infologger"
41+
pb "github.com/AliceO2Group/Control/common/protos"
4142
"github.com/AliceO2Group/Control/common/utils/uid"
4243
"github.com/AliceO2Group/Control/core/environment"
4344
"github.com/AliceO2Group/Control/core/integration"
4445
lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event"
46+
"github.com/AliceO2Group/Control/core/workflow/callable"
4547
"github.com/sirupsen/logrus"
4648
"github.com/spf13/viper"
4749
)
@@ -125,8 +127,18 @@ func (p *Plugin) GetEnvironmentsShortData(envIds []uid.ID) map[uid.ID]string {
125127
func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack map[string]interface{}) {
126128
return make(map[string]interface{})
127129
}
128-
func (p *Plugin) CallStack(_ interface{}) (stack map[string]interface{}) {
129-
return make(map[string]interface{})
130+
func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
131+
call, ok := data.(*callable.Call)
132+
if !ok {
133+
return
134+
}
135+
136+
stack = make(map[string]interface{})
137+
stack["UpdateFillInfo"] = func() (out string) {
138+
p.updateFillInfo(call)
139+
return
140+
}
141+
return
130142
}
131143

132144
func (p *Plugin) Destroy() error {
@@ -198,3 +210,83 @@ func (p *Plugin) readAndInjectLhcUpdates() {
198210
}
199211
}
200212
}
213+
214+
// UpdateFillInfo: propagate latest LHC fill info into the environment's global runtime vars
215+
func (p *Plugin) updateFillInfo(call *callable.Call) (out string) {
216+
varStack := call.VarStack
217+
envId, ok := varStack["environment_id"]
218+
if !ok {
219+
err := errors.New("cannot acquire environment ID")
220+
log.Error(err)
221+
222+
call.VarStack["__call_error_reason"] = err.Error()
223+
call.VarStack["__call_error"] = "LHC plugin Call Stack failed"
224+
return
225+
}
226+
227+
log := log.WithFields(logrus.Fields{
228+
"partition": envId,
229+
"call": "UpdateFillInfo",
230+
})
231+
232+
parentRole, ok := call.GetParentRole().(callable.ParentRole)
233+
if !ok || parentRole == nil {
234+
log.WithField(infologger.Level, infologger.IL_Support).
235+
Error("cannot access parent role to propagate LHC fill info")
236+
return
237+
}
238+
239+
if p.currentState == nil {
240+
log.WithField(infologger.Level, infologger.IL_Support).
241+
Warn("attempted to update environment with fill info, but fill info is not available in plugin")
242+
return
243+
}
244+
245+
// note: the following was causing very weird behaviours, which could be attributed to memory corruption.
246+
// I did not manage to understand why can't we safely clone such a proto message.
247+
// state := proto.Clone(p.currentState).(*pb.BeamInfo)
248+
249+
p.mu.Lock()
250+
defer p.mu.Unlock()
251+
state := p.currentState
252+
253+
parentRole.SetGlobalRuntimeVar("fill_info_beam_mode", state.BeamMode.String())
254+
255+
// If NO_BEAM, clear all other fill info and return
256+
if state.BeamMode == pb.BeamMode_NO_BEAM {
257+
parentRole.DeleteGlobalRuntimeVar("fill_info_fill_number")
258+
parentRole.DeleteGlobalRuntimeVar("fill_info_filling_scheme")
259+
parentRole.DeleteGlobalRuntimeVar("fill_info_beam_type")
260+
parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_start_ms")
261+
parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_end_ms")
262+
263+
log.WithField(infologger.Level, infologger.IL_Devel).
264+
Debug("NO_BEAM — cleared fill info vars and set beam mode only")
265+
return
266+
}
267+
268+
// Otherwise, propagate latest known info
269+
parentRole.SetGlobalRuntimeVar("fill_info_fill_number", strconv.FormatInt(int64(state.FillNumber), 10))
270+
parentRole.SetGlobalRuntimeVar("fill_info_filling_scheme", state.FillingSchemeName)
271+
parentRole.SetGlobalRuntimeVar("fill_info_beam_type", state.BeamType)
272+
if state.StableBeamsStart > 0 {
273+
parentRole.SetGlobalRuntimeVar("fill_info_stable_beam_start_ms", strconv.FormatInt(state.StableBeamsStart, 10))
274+
} else {
275+
parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_start_ms")
276+
}
277+
if state.StableBeamsEnd > 0 {
278+
parentRole.SetGlobalRuntimeVar("fill_info_stable_beam_end_ms", strconv.FormatInt(state.StableBeamsEnd, 10))
279+
} else {
280+
parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_end_ms")
281+
}
282+
283+
log.WithField("fillNumber", state.FillNumber).
284+
WithField("fillingScheme", state.FillingSchemeName).
285+
WithField("beamType", state.BeamType).
286+
WithField("beamMode", state.BeamMode).
287+
WithField("stableStartMs", state.StableBeamsStart).
288+
WithField("stableEndMs", state.StableBeamsEnd).
289+
WithField(infologger.Level, infologger.IL_Devel).
290+
Debug("updated environment fill info from latest snapshot")
291+
return
292+
}

docs/handbook/operation_order.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ This is the order of actions happening at a healthy start of run.
3232

3333
- `before_START_ACTIVITY` hooks with negative weights are executed:
3434
- `trg.PrepareForRun()` at `-200`
35+
- `lhc.UpdateFillInfo()` at `-50`
3536
- `"run_number"` is set.
3637
- `"run_start_time_ms"` is set using the current time. It is considered as the SOR and SOSOR timestamps.
3738
- `before_START_ACTIVITY` hooks with positive weights (incl. 0) are executed:
3839
- `trg.RunLoad()`, `bookkeeping.StartOfRun()` at `10`
39-
- `bookkeeping.RetrieveFillInfo()` at `11`
4040
- `kafka.PublishStartActivityUpdate()` at `50`
4141
- `dcs.StartOfRun()`, `odc.Start()` (does not need to return now), `ccdb.RunStart()` at `100`
4242

@@ -72,6 +72,7 @@ This is the order of actions happening at a healthy end of run.
7272
### before_STOP_ACTIVITY
7373

7474
- `before_STOP_ACTIVITY` hooks with negative weights are executed
75+
- `lhc.UpdateFillInfo()` at `-50`
7576
- `trg.RunStop()` at `-10`
7677
- `"run_end_time_ms"` is set using the current time. It is considered as the EOR and SOEOR timestamps.
7778
- `before_STOP_ACTIVITY` hooks with positive weights (incl. 0) are executed:

0 commit comments

Comments
 (0)