Skip to content

Commit cdcd86a

Browse files
author
Michal Tichák
committed
dcs
1 parent fc6eb41 commit cdcd86a

File tree

2 files changed

+51
-16
lines changed

2 files changed

+51
-16
lines changed

common/monitoring/grpcinterceptor.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,33 @@ import (
3030
"google.golang.org/grpc"
3131
)
3232

33+
type (
34+
EnvIDKey struct{}
35+
RunTypeKey struct{}
36+
)
37+
38+
func AddEnvAndRunType(ctx context.Context, envId, runType string) context.Context {
39+
ctx = context.WithValue(ctx, EnvIDKey{}, envId)
40+
ctx = context.WithValue(ctx, RunTypeKey{}, runType)
41+
return ctx
42+
}
43+
3344
type measuredClientStream struct {
3445
grpc.ClientStream
46+
ctx context.Context
3547
method string
3648
metricName string
3749
}
3850

3951
func (t *measuredClientStream) RecvMsg(m interface{}) error {
4052
metric := NewMetric(t.metricName)
4153
metric.AddTag("method", t.method)
54+
if env, ok := t.ctx.Value(EnvIDKey{}).(string); ok {
55+
metric.AddTag("envId", env)
56+
}
57+
if rt, ok := t.ctx.Value(RunTypeKey{}).(string); ok {
58+
metric.AddTag("runtype", rt)
59+
}
4260
defer TimerSendSingle(&metric, Millisecond)()
4361

4462
err := t.ClientStream.RecvMsg(m)
@@ -63,6 +81,7 @@ func SetupStreamClientInterceptor(metricName string, convert NameConvertType) gr
6381

6482
return &measuredClientStream{
6583
ClientStream: clientStream,
84+
ctx: ctx,
6685
method: convert(method),
6786
metricName: metricName,
6887
}, nil
@@ -73,6 +92,12 @@ func SetupUnaryClientInterceptor(name string, convert NameConvertType) grpc.Unar
7392
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
7493
metric := NewMetric(name)
7594
metric.AddTag("method", convert(method))
95+
if env, ok := ctx.Value(EnvIDKey{}).(string); ok {
96+
metric.AddTag("envId", env)
97+
}
98+
if rt, ok := ctx.Value(RunTypeKey{}).(string); ok {
99+
metric.AddTag("runtype", rt)
100+
}
76101
defer TimerSendSingle(&metric, Millisecond)()
77102
return invoker(ctx, method, req, reply, cc, opts...)
78103
}

core/integration/dcs/plugin.go

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -289,18 +289,18 @@ func (p *Plugin) Init(instanceId string) error {
289289
in := &dcspb.SubscriptionRequest{
290290
InstanceId: instanceId,
291291
}
292-
292+
293293
// Always start the goroutine, even if initial subscription fails
294294
go func() {
295295
var evStream dcspb.Configurator_SubscribeClient
296296
var err error
297-
297+
298298
for {
299299
// Try to establish subscription if we don't have one
300300
if evStream == nil {
301301
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
302302
Debug("attempting to subscribe to DCS service")
303-
303+
304304
evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
305305
if err != nil {
306306
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
@@ -699,7 +699,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
699699

700700
var stream dcspb.Configurator_StartOfRunClient
701701
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "PFR", envId)
702-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
702+
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
703703
defer cancel()
704704

705705
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
@@ -746,7 +746,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
746746
return
747747
}
748748

749-
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
749+
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
750750

751751
dcsFailedEcsDetectors := make([]string, 0)
752752
dcsopOk := true
@@ -1064,7 +1064,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
10641064

10651065
var stream dcspb.Configurator_StartOfRunClient
10661066
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "SOR", envId)
1067-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1067+
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
10681068
defer cancel()
10691069

10701070
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
@@ -1112,7 +1112,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11121112
}
11131113
p.pendingEORs[envId] = runNumber64 // make sure the corresponding EOR runs sooner or later
11141114

1115-
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
1115+
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
11161116

11171117
dcsFailedEcsDetectors := make([]string, 0)
11181118
dcsopOk := true
@@ -1298,7 +1298,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12981298

12991299
var stream dcspb.Configurator_EndOfRunClient
13001300
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "EOR", envId)
1301-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1301+
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
13021302
defer cancel()
13031303

13041304
payload := map[string]interface{}{
@@ -1356,7 +1356,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13561356
detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE
13571357
}
13581358

1359-
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
1359+
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
13601360

13611361
dcsFailedEcsDetectors := make([]string, 0)
13621362
dcsopOk := true
@@ -1452,17 +1452,27 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14521452
return
14531453
}
14541454

1455-
func newMetric(method string) monitoring.Metric {
1455+
func extractRunType(varStack map[string]string) string {
1456+
runType, ok := varStack["run_type"]
1457+
if !ok {
1458+
runType = "undefined"
1459+
}
1460+
return runType
1461+
}
1462+
1463+
func newMetric(runType, envId, method string) monitoring.Metric {
14561464
metric := monitoring.NewMetric("dcsecs")
14571465
metric.AddTag("method", method)
1466+
metric.AddTag("envId", envId)
1467+
metric.AddTag("runtype", runType)
14581468
return metric
14591469
}
14601470

14611471
func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
14621472
payloadJsonForKafka []byte, stream dcspb.Configurator_EndOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
1463-
callFailedStr string, payload map[string]interface{},
1473+
callFailedStr string, payload map[string]interface{}, runType string,
14641474
) (error, []byte) {
1465-
metric := newMetric("EOR")
1475+
metric := newMetric(runType, envId, "EOR")
14661476
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
14671477

14681478
var dcsEvent *dcspb.RunEvent
@@ -1692,9 +1702,9 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
16921702

16931703
func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
16941704
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
1695-
callFailedStr string, payload map[string]interface{},
1705+
callFailedStr string, payload map[string]interface{}, runType string,
16961706
) (error, []byte) {
1697-
metric := newMetric("SOR")
1707+
metric := newMetric(runType, envId, "SOR")
16981708
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
16991709

17001710
var dcsEvent *dcspb.RunEvent
@@ -1961,9 +1971,9 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
19611971

19621972
func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
19631973
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
1964-
callFailedStr string, payload map[string]interface{},
1974+
callFailedStr string, payload map[string]interface{}, runType string,
19651975
) (error, []byte) {
1966-
metric := newMetric("PFR")
1976+
metric := newMetric(runType, envId, "PFR")
19671977
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
19681978

19691979
var err error

0 commit comments

Comments
 (0)