Skip to content

Commit c747c95

Browse files
author
Michal Tichák
committed
fixup! [core] Added monitoring to hooks and communication with outside services
1 parent f67681e commit c747c95

File tree

3 files changed

+74
-8
lines changed

3 files changed

+74
-8
lines changed

common/monitoring/metric.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package monitoring
2727
import (
2828
"fmt"
2929
"io"
30+
"sort"
3031
"time"
3132

3233
lp "github.com/influxdata/line-protocol/v2/lineprotocol"
@@ -110,6 +111,10 @@ func Format(writer io.Writer, metrics []Metric) error {
110111
var enc lp.Encoder
111112

112113
for _, metric := range metrics {
114+
// AddTag requires tags sorted lexicografically
115+
sort.Slice(metric.tags, func(i int, j int) bool {
116+
return metric.tags[i].name < metric.tags[j].name
117+
})
113118
enc.StartLine(metric.name)
114119
for _, tag := range metric.tags {
115120
enc.AddTag(tag.name, tag.value)

core/integration/dcs/client.go

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
7272
Timeout: time.Second,
7373
PermitWithoutStream: true,
7474
}),
75+
grpc.WithStreamInterceptor(setupStreamClientInterceptor()),
7576
}
7677
if !viper.GetBool("dcsServiceUseSystemProxy") {
7778
dialOptions = append(dialOptions, grpc.WithNoProxy())
@@ -126,6 +127,57 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
126127
return client
127128
}
128129

130+
type measuredClientStream struct {
131+
grpc.ClientStream
132+
method string
133+
}
134+
135+
func (t *measuredClientStream) RecvMsg(m interface{}) error {
136+
metric := monitoring.NewMetric("dcs_stream")
137+
metric.AddTag("method", t.method)
138+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
139+
err := t.ClientStream.RecvMsg(m)
140+
141+
return err
142+
}
143+
144+
func convertMethodName(method string) (converted string) {
145+
switch method {
146+
case dcspb.Configurator_Subscribe_FullMethodName:
147+
converted = "Subscribe"
148+
case dcspb.Configurator_PrepareForRun_FullMethodName:
149+
converted = "PFR"
150+
case dcspb.Configurator_StartOfRun_FullMethodName:
151+
converted = "SOR"
152+
case dcspb.Configurator_EndOfRun_FullMethodName:
153+
converted = "EOR"
154+
default:
155+
converted = "Unknown"
156+
}
157+
return
158+
}
159+
160+
func setupStreamClientInterceptor() grpc.StreamClientInterceptor {
161+
return func(
162+
ctx context.Context,
163+
desc *grpc.StreamDesc,
164+
cc *grpc.ClientConn,
165+
method string,
166+
streamer grpc.Streamer,
167+
opts ...grpc.CallOption,
168+
) (grpc.ClientStream, error) {
169+
clientStream, err := streamer(ctx, desc, cc, method, opts...)
170+
if err != nil {
171+
return nil, err
172+
}
173+
174+
return &measuredClientStream{
175+
ClientStream: clientStream,
176+
method: convertMethodName(method),
177+
}, nil
178+
}
179+
}
180+
129181
func (m *RpcClient) GetConnState() connectivity.State {
130182
if m.conn == nil {
131183
return connectivity.Idle
@@ -140,28 +192,28 @@ func (m *RpcClient) Close() error {
140192

141193
func (m *RpcClient) Subscribe(ctx context.Context, in *dcspb.SubscriptionRequest, opts ...grpc.CallOption) (dcspb.Configurator_SubscribeClient, error) {
142194
metric := newMetric()
143-
metric.AddTag("method", "Subscribe")
195+
metric.AddTag("stream_setup", "Subscribe")
144196
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
145197
return m.ConfiguratorClient.Subscribe(ctx, in, opts...)
146198
}
147199

148200
func (m *RpcClient) PrepareForRun(ctx context.Context, in *dcspb.PfrRequest, opts ...grpc.CallOption) (dcspb.Configurator_PrepareForRunClient, error) {
149201
metric := newMetric()
150-
metric.AddTag("method", "PFR")
202+
metric.AddTag("stream_setup", "PFR")
151203
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
152204
return m.ConfiguratorClient.PrepareForRun(ctx, in, opts...)
153205
}
154206

155207
func (m *RpcClient) StartOfRun(ctx context.Context, in *dcspb.SorRequest, opts ...grpc.CallOption) (dcspb.Configurator_StartOfRunClient, error) {
156208
metric := newMetric()
157-
metric.AddTag("method", "SOR")
209+
metric.AddTag("stream_setup", "SOR")
158210
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
159211
return m.ConfiguratorClient.StartOfRun(ctx, in, opts...)
160212
}
161213

162214
func (m *RpcClient) EndOfRun(ctx context.Context, in *dcspb.EorRequest, opts ...grpc.CallOption) (dcspb.Configurator_EndOfRunClient, error) {
163215
metric := newMetric()
164-
metric.AddTag("method", "EOR")
216+
metric.AddTag("stream_setup", "EOR")
165217
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
166218
return m.ConfiguratorClient.EndOfRun(ctx, in, opts...)
167219
}

core/workflow/callable/call.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ func (c *Call) Call() error {
114114
WithField("level", infologger.IL_Devel).
115115
Debugf("calling hook function %s", c.Func)
116116

117+
metric := c.newMetric("callable_call")
118+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
119+
117120
the.EventWriterWithTopic(topic.Call).WriteEvent(&evpb.Ev_CallEvent{
118121
Path: c.GetParentRolePath(),
119122
Func: c.Func,
@@ -221,17 +224,23 @@ func (c *Call) Call() error {
221224
return nil
222225
}
223226

227+
func (c *Call) newMetric(name string) monitoring.Metric {
228+
metric := monitoring.NewMetric(name)
229+
metric.AddTag("name", c.GetName())
230+
metric.AddTag("trigger", c.GetTraits().Trigger)
231+
metric.AddTag("envId", c.parentRole.GetEnvironmentId().String())
232+
return metric
233+
}
234+
224235
func (c *Call) Start() {
225236
c.await = make(chan error)
226237
ctx, cancel := context.WithCancel(context.Background())
227238
c.awaitCancel = cancel
228239
go func() {
229-
callId := fmt.Sprintf("hook:%s:%s", c.GetTraits().Trigger, c.GetName())
230-
metric := monitoring.NewMetric("callable")
231-
metric.AddTag("id", callId)
232-
metric.AddTag("envId", c.parentRole.GetEnvironmentId().String())
240+
metric := c.newMetric("callable_wrapped")
233241
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
234242

243+
callId := fmt.Sprintf("hook:%s:%s", c.GetTraits().Trigger, c.GetName())
235244
log.Debugf("%s started", callId)
236245
defer utils.TimeTrack(time.Now(), callId, log.WithPrefix("callable"))
237246
select {

0 commit comments

Comments
 (0)