Skip to content

Commit 64016b0

Browse files
author
Michal Tichák
committed
[core] Added monitoring to hooks and communication with outside services
- all protofiles are regenerated with new protoc to get *FullName* strings in grpc files - we monitor every trigger - we monitor every process run by a trigger (more granularity) - we monitor outside communication with services defined in integration - DCS: we monitor duration of it's stream Recv() function - DCS: we monitor duration of whole gRPC stream communication for PFR, SOR, EOR
1 parent c187adf commit 64016b0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2060
-1688
lines changed

apricot/protos/apricot.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apricot/protos/apricot_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coconut/protos/o2control.pb.go

Lines changed: 756 additions & 743 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coconut/protos/o2control_grpc.pb.go

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/event/writer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
110110
metric.SetFieldUInt64("messages_failed", 0)
111111

112112
metricDuration := writer.newMetric(KAFKAWRITER)
113-
defer monitoring.SendHistogrammable(&metricDuration)
114-
defer monitoring.TimerNS(&metricDuration)()
113+
defer monitoring.TimerSendHist(&metricDuration, monitoring.Nanosecond)()
115114

116115
if err := writer.WriteMessages(context.Background(), messages...); err != nil {
117116
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
@@ -250,8 +249,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
250249

251250
metric := w.newMetric(KAFKAPREPARE)
252251

253-
defer monitoring.SendHistogrammable(&metric)
254-
defer monitoring.TimerNS(&metric)()
252+
defer monitoring.TimerSendHist(&metric, monitoring.Nanosecond)()
255253

256254
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
257255
if err != nil {
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Michal Tichak <michal.tichak@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package monitoring
26+
27+
import (
28+
"context"
29+
30+
"google.golang.org/grpc"
31+
)
32+
33+
type measuredClientStream struct {
34+
grpc.ClientStream
35+
method string
36+
metricName string
37+
}
38+
39+
func (t *measuredClientStream) RecvMsg(m interface{}) error {
40+
metric := NewMetric(t.metricName)
41+
metric.AddTag("method", t.method)
42+
defer TimerSendSingle(&metric, Millisecond)()
43+
44+
err := t.ClientStream.RecvMsg(m)
45+
return err
46+
}
47+
48+
type NameConvertType func(string) string
49+
50+
func SetupStreamClientInterceptor(metricName string, convert NameConvertType) grpc.StreamClientInterceptor {
51+
return func(
52+
ctx context.Context,
53+
desc *grpc.StreamDesc,
54+
cc *grpc.ClientConn,
55+
method string,
56+
streamer grpc.Streamer,
57+
opts ...grpc.CallOption,
58+
) (grpc.ClientStream, error) {
59+
clientStream, err := streamer(ctx, desc, cc, method, opts...)
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
return &measuredClientStream{
65+
ClientStream: clientStream,
66+
method: convert(method),
67+
metricName: metricName,
68+
}, nil
69+
}
70+
}
71+
72+
func SetupUnaryClientInterceptor(name string, convert NameConvertType) grpc.UnaryClientInterceptor {
73+
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
74+
metric := NewMetric(name)
75+
metric.AddTag("method", convert(method))
76+
defer TimerSendSingle(&metric, Millisecond)()
77+
return invoker(ctx, method, req, reply, cc, opts...)
78+
}
79+
}

common/monitoring/metric.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package monitoring
2727
import (
2828
"fmt"
2929
"io"
30+
"sort"
3031
"time"
3132

3233
lp "github.com/influxdata/line-protocol/v2/lineprotocol"
@@ -110,6 +111,10 @@ func Format(writer io.Writer, metrics []Metric) error {
110111
var enc lp.Encoder
111112

112113
for _, metric := range metrics {
114+
// AddTag requires tags sorted lexicografically
115+
sort.Slice(metric.tags, func(i int, j int) bool {
116+
return metric.tags[i].name < metric.tags[j].name
117+
})
113118
enc.StartLine(metric.name)
114119
for _, tag := range metric.tags {
115120
enc.AddTag(tag.name, tag.value)

common/monitoring/monitoring_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,8 @@ func TestMetricsHistogramObject(t *testing.T) {
546546
}
547547

548548
func measureFunc(metric *Metric) {
549-
defer TimerMS(metric)()
550-
defer TimerNS(metric)()
549+
defer Timer(metric, Millisecond)()
550+
defer Timer(metric, Nanosecond)()
551551
time.Sleep(100 * time.Millisecond)
552552
}
553553

common/monitoring/timer.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,54 @@
11
package monitoring
22

3-
import "time"
3+
import (
4+
"time"
5+
)
46

5-
// Timer* functions are meant to be used with defer statement to measure runtime of given function:
6-
// defer TimerNS(&metric)()
7-
func TimerMS(metric *Metric) func() {
8-
start := time.Now()
9-
return func() {
10-
metric.SetFieldInt64("execution_time_ms", time.Since(start).Milliseconds())
11-
}
7+
type TimeResolution int
8+
9+
const (
10+
Millisecond TimeResolution = iota
11+
Nanosecond
12+
)
13+
14+
// Timer function is meant to be used with defer statement to measure runtime of given function:
15+
// defer Timer(&metric, Milliseconds)()
16+
func Timer(metric *Metric, unit TimeResolution) func() {
17+
return timer(metric, unit, false, false)
1218
}
1319

14-
func TimerNS(metric *Metric) func() {
20+
// Timer function is meant to be used with defer statement to measure runtime of given function:
21+
// defer Timer(&metric, Milliseconds)()
22+
// sends measured value as Send(metric)
23+
func TimerSendSingle(metric *Metric, unit TimeResolution) func() {
24+
return timer(metric, unit, true, false)
25+
}
26+
27+
// Timer function is meant to be used with defer statement to measure runtime of given function:
28+
// defer Timer(&metric, Milliseconds)()
29+
// sends measured value as SendHistogrammable(metric)
30+
func TimerSendHist(metric *Metric, unit TimeResolution) func() {
31+
return timer(metric, unit, true, true)
32+
}
33+
34+
func timer(metric *Metric, unit TimeResolution, send bool, sendHistogrammable bool) func() {
1535
start := time.Now()
36+
1637
return func() {
17-
metric.SetFieldInt64("execution_time_ns", time.Since(start).Nanoseconds())
38+
dur := time.Since(start)
39+
// we are setting default value as Nanoseconds
40+
if unit == Millisecond {
41+
metric.SetFieldInt64("execution_time_ms", dur.Milliseconds())
42+
} else {
43+
metric.SetFieldInt64("execution_time_ns", dur.Nanoseconds())
44+
}
45+
46+
if send {
47+
if sendHistogrammable {
48+
SendHistogrammable(metric)
49+
} else {
50+
Send(metric)
51+
}
52+
}
1853
}
1954
}

common/protos/common.pb.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)