Skip to content

Commit c5cae6d

Browse files
author
Michal Tichák
committed
others
1 parent cdcd86a commit c5cae6d

File tree

6 files changed

+75
-49
lines changed

6 files changed

+75
-49
lines changed

core/integration/ccdb/plugin.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
481481
return
482482
}
483483
p.existingRuns[grp.runNumber] = types.Nil{}
484-
err := p.uploadCurrentGRP(grp, envId, true)
484+
err := p.uploadCurrentGRP(grp, envId, true, varStack, "RunStart")
485485
if err != nil {
486486
log.WithField("call", "RunStop").
487487
WithField("run", grp.runNumber).
@@ -506,7 +506,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
506506
_, runExists := p.existingRuns[grp.runNumber]
507507
if runExists {
508508
delete(p.existingRuns, grp.runNumber)
509-
err := p.uploadCurrentGRP(grp, envId, false)
509+
err := p.uploadCurrentGRP(grp, envId, false, varStack, "RunStop")
510510
if err != nil {
511511
log.WithField("call", "RunStop").
512512
WithField("run", grp.runNumber).
@@ -525,7 +525,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
525525
return
526526
}
527527

528-
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool) error {
528+
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool, varStack map[string]string, callName string) error {
529529
if grp == nil {
530530
return errors.New(fmt.Sprintf("Failed to create a GRP object"))
531531
}
@@ -550,6 +550,8 @@ func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refre
550550

551551
metric := monitoring.NewMetric("ccdb")
552552
metric.AddTag("envId", envId)
553+
metric.AddTag("runtype", integration.ExtractRunTypeOrUndefined(varStack))
554+
metric.AddTag("call", callName)
553555
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
554556

555557
cmd := exec.CommandContext(ctx, "bash", "-c", cmdStr)

core/integration/dcs/plugin.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
699699

700700
var stream dcspb.Configurator_StartOfRunClient
701701
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "PFR", envId)
702-
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
702+
ctx, cancel := context.WithTimeout(
703+
monitoring.AddEnvAndRunType(context.Background(), envId,
704+
integration.ExtractRunTypeOrUndefined(varStack)), timeout)
703705
defer cancel()
704706

705707
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
@@ -746,7 +748,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
746748
return
747749
}
748750

749-
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
751+
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
752+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
750753

751754
dcsFailedEcsDetectors := make([]string, 0)
752755
dcsopOk := true
@@ -1064,7 +1067,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
10641067

10651068
var stream dcspb.Configurator_StartOfRunClient
10661069
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "SOR", envId)
1067-
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
1070+
1071+
ctx, cancel := context.WithTimeout(
1072+
monitoring.AddEnvAndRunType(context.Background(), envId,
1073+
integration.ExtractRunTypeOrUndefined(varStack)), timeout)
10681074
defer cancel()
10691075

10701076
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
@@ -1112,7 +1118,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11121118
}
11131119
p.pendingEORs[envId] = runNumber64 // make sure the corresponding EOR runs sooner or later
11141120

1115-
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
1121+
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
1122+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
11161123

11171124
dcsFailedEcsDetectors := make([]string, 0)
11181125
dcsopOk := true
@@ -1298,7 +1305,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12981305

12991306
var stream dcspb.Configurator_EndOfRunClient
13001307
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "EOR", envId)
1301-
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
1308+
ctx, cancel := context.WithTimeout(
1309+
monitoring.AddEnvAndRunType(context.Background(), envId,
1310+
integration.ExtractRunTypeOrUndefined(varStack)), timeout)
13021311
defer cancel()
13031312

13041313
payload := map[string]interface{}{
@@ -1356,7 +1365,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13561365
detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE
13571366
}
13581367

1359-
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
1368+
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
1369+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
13601370

13611371
dcsFailedEcsDetectors := make([]string, 0)
13621372
dcsopOk := true
@@ -1452,14 +1462,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14521462
return
14531463
}
14541464

1455-
func extractRunType(varStack map[string]string) string {
1456-
runType, ok := varStack["run_type"]
1457-
if !ok {
1458-
runType = "undefined"
1459-
}
1460-
return runType
1461-
}
1462-
14631465
func newMetric(runType, envId, method string) monitoring.Metric {
14641466
metric := monitoring.NewMetric("dcsecs")
14651467
metric.AddTag("method", method)

core/integration/ddsched/plugin.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040

4141
"github.com/AliceO2Group/Control/common/event/topic"
4242
"github.com/AliceO2Group/Control/common/logger/infologger"
43+
"github.com/AliceO2Group/Control/common/monitoring"
4344
pb "github.com/AliceO2Group/Control/common/protos"
4445
"github.com/AliceO2Group/Control/common/utils/uid"
4546
"github.com/AliceO2Group/Control/core/environment"
@@ -163,6 +164,7 @@ func (p *Plugin) partitionStatesForEnvs(envIds []uid.ID) map[uid.ID]map[string]s
163164
EnvironmentId: envId.String(),
164165
}
165166
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("ddschedStatusTimeout"))
167+
monitoring.AddEnvAndRunType(ctx, envId.String(), "none")
166168
state, err := p.ddSchedClient.PartitionStatus(ctx, &in, grpc.EmptyCallOption{})
167169
cancel()
168170
if err != nil {
@@ -321,11 +323,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
321323
return
322324
}
323325

324-
var (
325-
response *ddpb.PartitionResponse
326-
)
326+
var response *ddpb.PartitionResponse
327327
timeout := callable.AcquireTimeout(DDSCHED_INITIALIZE_TIMEOUT, varStack, "Initialize", envId)
328-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
328+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
329329
defer cancel()
330330

331331
payload := map[string]interface{}{
@@ -574,11 +574,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
574574
return
575575
}
576576

577-
var (
578-
response *ddpb.PartitionResponse
579-
)
577+
var response *ddpb.PartitionResponse
580578
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
581-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
579+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
582580
defer cancel()
583581

584582
payload := map[string]interface{}{
@@ -821,16 +819,14 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
821819
return
822820
}
823821

824-
var (
825-
response *ddpb.PartitionResponse
826-
)
822+
var response *ddpb.PartitionResponse
827823

828824
infoReq := ddpb.PartitionInfo{
829825
EnvironmentId: envId,
830826
PartitionId: envId,
831827
}
832828
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
833-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
829+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
834830
defer cancel()
835831

836832
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{

core/integration/odc/plugin.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
common_event "github.com/AliceO2Group/Control/common/event"
4646
"github.com/AliceO2Group/Control/common/event/topic"
4747
"github.com/AliceO2Group/Control/common/logger/infologger"
48+
"github.com/AliceO2Group/Control/common/monitoring"
4849
pb "github.com/AliceO2Group/Control/common/protos"
4950
"github.com/AliceO2Group/Control/common/utils"
5051
"github.com/AliceO2Group/Control/common/utils/uid"
@@ -184,6 +185,7 @@ func (p *Plugin) GetConnectionState() string {
184185
func (p *Plugin) queryPartitionStatus() {
185186
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
186187
ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT)
188+
monitoring.AddEnvAndRunType(ctx, "none", "none")
187189
defer cancel()
188190

189191
statusRep := &odc.StatusReply{}
@@ -239,6 +241,7 @@ func (p *Plugin) queryPartitionStatus() {
239241
defer wg.Done()
240242

241243
ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT)
244+
monitoring.AddEnvAndRunType(ctx, "none", "none")
242245
defer cancel()
243246

244247
odcPartStateRep, err := p.odcClient.GetState(ctx, &odc.StateRequest{
@@ -1179,7 +1182,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11791182

11801183
timeout := callable.AcquireTimeout(ODC_PARTITIONINITIALIZE_TIMEOUT, varStack, "PartitionInitialize", envId)
11811184

1182-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1185+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
11831186
defer cancel()
11841187

11851188
err = handleRun(ctx, p.odcClient, isManualXml, map[string]string{
@@ -1292,7 +1295,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12921295
}
12931296
}
12941297

1295-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1298+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
12961299
defer cancel()
12971300
err := handleConfigure(ctx, p.odcClient, arguments, paddingTimeout, envId, call)
12981301
if err != nil {
@@ -1314,7 +1317,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13141317

13151318
callFailedStr := "EPN Reset call failed"
13161319

1317-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1320+
ctx, cancel := integration.NewContext(envId, call.VarStack, timeout)
13181321
defer cancel()
13191322
err := handleReset(ctx, p.odcClient, nil, paddingTimeout, envId, call)
13201323
if err != nil {
@@ -1343,7 +1346,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13431346

13441347
callFailedStr := "EPN PartitionTerminate call failed"
13451348

1346-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1349+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
13471350
defer cancel()
13481351
err := handlePartitionTerminate(ctx, p.odcClient, nil, paddingTimeout, envId, call)
13491352
if err != nil {
@@ -1414,7 +1417,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14141417
arguments["original_run_number"] = originalRunNumber
14151418
}
14161419

1417-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1420+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14181421
defer cancel()
14191422
err = handleStart(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
14201423
if err != nil {
@@ -1462,7 +1465,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14621465

14631466
timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "Stop", envId)
14641467

1465-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1468+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14661469
defer cancel()
14671470
err = handleStop(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
14681471
if err != nil {
@@ -1486,7 +1489,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14861489

14871490
timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "EnsureStop", envId)
14881491

1489-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1492+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14901493
defer cancel()
14911494

14921495
state, err := handleGetState(ctx, p.odcClient, envId)
@@ -1551,7 +1554,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15511554

15521555
callFailedStr := "EPN EnsureCleanup call failed"
15531556

1554-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1557+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15551558
defer cancel()
15561559
err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, envId, call)
15571560
if err != nil {
@@ -1572,7 +1575,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15721575

15731576
callFailedStr := "EPN PreDeploymentCleanup call failed"
15741577

1575-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1578+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15761579
defer cancel()
15771580
err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, "", call)
15781581
if err != nil {
@@ -1593,7 +1596,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15931596

15941597
callFailedStr := "EPN EnsureCleanupLegacy call failed"
15951598

1596-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1599+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15971600
defer cancel()
15981601
err := handleCleanupLegacy(ctx, p.odcClient, nil, paddingTimeout, envId, call)
15991602
if err != nil {

core/integration/plugin.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@
2727
package integration
2828

2929
import (
30+
"context"
3031
"sync"
32+
"time"
3133

3234
"github.com/AliceO2Group/Control/common/logger"
35+
"github.com/AliceO2Group/Control/common/monitoring"
3336
"github.com/AliceO2Group/Control/common/utils/uid"
3437
"github.com/sirupsen/logrus"
3538
"github.com/spf13/viper"
@@ -121,7 +124,7 @@ func (p Plugins) CallStack(data interface{}) (stack map[string]interface{}) {
121124
func (p Plugins) ObjectStack(varStack map[string]string, baseConfigStack map[string]string) (stack map[string]interface{}) {
122125
stack = make(map[string]interface{})
123126

124-
//HACK: this is a dummy object+function to allow odc.GenerateEPNTopologyFullname in the root role
127+
// HACK: this is a dummy object+function to allow odc.GenerateEPNTopologyFullname in the root role
125128
stack["odc"] = map[string]interface{}{
126129
"GenerateEPNTopologyFullname": func() string {
127130
return ""
@@ -221,3 +224,20 @@ func Reset() {
221224
loaderOnce = sync.Once{}
222225
pluginLoaders = make(map[string]func() Plugin)
223226
}
227+
228+
func ExtractRunTypeOrUndefined(varStack map[string]string) string {
229+
runType, ok := varStack["run_type"]
230+
if !ok {
231+
runType = "undefined"
232+
}
233+
return runType
234+
}
235+
236+
func NewContext(envId string, varStack map[string]string, timeout time.Duration) (context.Context, context.CancelFunc) {
237+
return context.WithTimeout(
238+
monitoring.AddEnvAndRunType(context.Background(),
239+
envId,
240+
ExtractRunTypeOrUndefined(varStack),
241+
),
242+
timeout)
243+
}

0 commit comments

Comments
 (0)