From e113faea28548884f4a4cadacdd1631593a1df4c Mon Sep 17 00:00:00 2001 From: Rima Shah Date: Mon, 22 May 2023 22:23:24 -0600 Subject: [PATCH 1/8] Added current_time_epoch_ms and changes calculation for timestamp for ms --- traffic_monitor/cache/cache.go | 4 ++++ traffic_monitor/cache/stats_over_http.json | 1 + traffic_monitor/health/cache.go | 4 ++-- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/traffic_monitor/cache/cache.go b/traffic_monitor/cache/cache.go index 339c4a5f5c..da98a1cd74 100644 --- a/traffic_monitor/cache/cache.go +++ b/traffic_monitor/cache/cache.go @@ -300,6 +300,10 @@ func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime t } stats, miscStats, err := decoder.Parse(result.ID, rdr, pollCtx) + fmt.Println(miscStats["current_time_epoch_ms"]) + //if val, ok := (miscStats["current_time_epoch_ms"]); ok { + // result.Time = val + //} if err != nil { log.Warnf("%s decode error '%v'", id, err) result.Error = err diff --git a/traffic_monitor/cache/stats_over_http.json b/traffic_monitor/cache/stats_over_http.json index 70cce951b4..a7d8ed4b20 100644 --- a/traffic_monitor/cache/stats_over_http.json +++ b/traffic_monitor/cache/stats_over_http.json @@ -533,6 +533,7 @@ "plugin.system_stats.net.docker0.rx_length_errors": "0", "proxy.process.cache.volume_0.span.offline": "0", "proxy.process.cache.volume_0.span.online": "0", + "current_time_epoch_ms": "1684784878894", "server": "10.0.0" } } diff --git a/traffic_monitor/health/cache.go b/traffic_monitor/health/cache.go index 94489884db..f39569e06e 100644 --- a/traffic_monitor/health/cache.go +++ b/traffic_monitor/health/cache.go @@ -115,7 +115,7 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic } if prevResult != nil && prevResult.InterfaceVitals != nil && prevResult.InterfaceVitals[ifaceName].BytesOut != 0 { - elapsedTimeInSecs := float64(newResult.Time.UnixNano()-prevResult.Time.UnixNano()) / 1000000000 + elapsedTimeInSecs := float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 ifaceVitals.KbpsOut = int64(float64((ifaceVitals.BytesOut-prevResult.InterfaceVitals[ifaceName].BytesOut)*8/1000) / elapsedTimeInSecs) } newResult.InterfaceVitals[ifaceName] = ifaceVitals @@ -128,7 +128,7 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic } if prevResult != nil && prevResult.Vitals.BytesOut != 0 { - elapsedTimeInSecs := float64(newResult.Time.UnixNano()-prevResult.Time.UnixNano()) / 1000000000 + elapsedTimeInSecs := float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 newResult.Vitals.KbpsOut = int64(float64((newResult.Vitals.BytesOut-prevResult.Vitals.BytesOut)*8/1000) / elapsedTimeInSecs) } From 00ccac0d3b37decba7ae119532c067dcde79f2c0 Mon Sep 17 00:00:00 2001 From: Rima Shah Date: Wed, 24 May 2023 09:19:24 -0600 Subject: [PATCH 2/8] assigning current_time_epoch_ms to result.Time in Handler --- traffic_monitor/cache/cache.go | 13 ++++++--- traffic_monitor/cache/cache_test.go | 41 +++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/traffic_monitor/cache/cache.go b/traffic_monitor/cache/cache.go index da98a1cd74..af5229999d 100644 --- a/traffic_monitor/cache/cache.go +++ b/traffic_monitor/cache/cache.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "regexp" + "strconv" "time" "github.com/apache/trafficcontrol/lib/go-log" @@ -300,10 +301,14 @@ func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime t } stats, miscStats, err := decoder.Parse(result.ID, rdr, pollCtx) - fmt.Println(miscStats["current_time_epoch_ms"]) - //if val, ok := (miscStats["current_time_epoch_ms"]); ok { - // result.Time = val - //} + if val, ok := miscStats["current_time_epoch_ms"]; ok { + valString := fmt.Sprintf("%s", val) + valInt, err := strconv.ParseInt(valString, 10, 64) + if err != nil { + log.Errorf("parse error '%v'", err) + } + result.Time = time.UnixMilli(valInt) + } if err != nil { log.Warnf("%s decode error '%v'", id, err) result.Error = err diff --git a/traffic_monitor/cache/cache_test.go b/traffic_monitor/cache/cache_test.go index b20a392507..15618d06ad 100644 --- a/traffic_monitor/cache/cache_test.go +++ b/traffic_monitor/cache/cache_test.go @@ -20,11 +20,15 @@ package cache */ import ( - "testing" - + "bytes" + "fmt" "github.com/apache/trafficcontrol/lib/go-tc" "github.com/apache/trafficcontrol/lib/go-util" + "github.com/apache/trafficcontrol/traffic_monitor/poller" "github.com/apache/trafficcontrol/traffic_monitor/todata" + "io/ioutil" + "net/http" + "testing" ) func TestHandlerPrecompute(t *testing.T) { @@ -95,3 +99,36 @@ func TestComputeStatGbps(t *testing.T) { } } } + +func TestParseAndDecode(t *testing.T) { + file, err := ioutil.ReadFile("stats_over_http.json") + if err != nil { + t.Fatal(err) + } + + pl := &poller.HTTPPollCtx{HTTPHeader: http.Header{}} + ctx := interface{}(pl) + ctx.(*poller.HTTPPollCtx).HTTPHeader.Set("Content-Type", "text/json") + + decoder, err := GetDecoder("stats_over_http") + if err != nil { + t.Errorf("decoder error, expected: nil, got: %v", err) + } + + _, miscStats, err := decoder.Parse("1", bytes.NewReader(file), ctx) + if err != nil { + t.Errorf("decoder parse error, expected: nil, got: %v", err) + } + + if len(miscStats) < 1 { + t.Errorf("empty miscStats structure") + } + + if val, ok := miscStats["current_time_epoch_ms"]; ok { + valString := fmt.Sprintf("%s", val) + if valString != "1684784878894" { + t.Errorf("unable to read `current_time_epoch_ms`") + } + } + +} From 2981a6ea0f0fb72728f0a039839a23bc3aee7284 Mon Sep 17 00:00:00 2001 From: Rima Shah Date: Wed, 24 May 2023 09:44:36 -0600 Subject: [PATCH 3/8] updated CHANGELOG.md --- CHANGELOG.md | 1 + traffic_monitor/cache/cache_test.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0418990302..b5b7bde146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - [#7469](https://github.com/apache/trafficcontrol/pull/7469) *Traffic Ops* Changed logic to not report empty or missing cookies into TO error.log. ### Fixed +- [#7539](https://github.com/apache/trafficcontrol/pull/7539) *Traffic Monitor* Use stats_over_http timestamp to calculate bandwidth for TM's health. - [#7542](https://github.com/apache/trafficcontrol/pull/7542) *Traffic Ops* Fixed `CDN Locks` documentation to reflect the correct RFC3339 timestamps. - [#6340](https://github.com/apache/trafficcontrol/issues/6340) *Traffic Ops* Fixed alert messages for POST and PUT invalidation job APIs. - [#7511](https://github.com/apache/trafficcontrol/pull/7511) *Traffic Ops* Fixed the changelog registration message to include the username instead of duplicate email entry. diff --git a/traffic_monitor/cache/cache_test.go b/traffic_monitor/cache/cache_test.go index 15618d06ad..588e14d1a5 100644 --- a/traffic_monitor/cache/cache_test.go +++ b/traffic_monitor/cache/cache_test.go @@ -130,5 +130,4 @@ func TestParseAndDecode(t *testing.T) { t.Errorf("unable to read `current_time_epoch_ms`") } } - } From 29faeab4cc773e122d0effdb69208fd45e5844c3 Mon Sep 17 00:00:00 2001 From: Rima Shah Date: Thu, 25 May 2023 10:02:33 -0600 Subject: [PATCH 4/8] Added check for elapsedTime. --- traffic_monitor/health/cache.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/traffic_monitor/health/cache.go b/traffic_monitor/health/cache.go index f39569e06e..5e73f3933f 100644 --- a/traffic_monitor/health/cache.go +++ b/traffic_monitor/health/cache.go @@ -116,6 +116,9 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic if prevResult != nil && prevResult.InterfaceVitals != nil && prevResult.InterfaceVitals[ifaceName].BytesOut != 0 { elapsedTimeInSecs := float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 + if elapsedTimeInSecs == 0 { + elapsedTimeInSecs = float64(prevResult.Time.UnixMilli() / 1000) + } ifaceVitals.KbpsOut = int64(float64((ifaceVitals.BytesOut-prevResult.InterfaceVitals[ifaceName].BytesOut)*8/1000) / elapsedTimeInSecs) } newResult.InterfaceVitals[ifaceName] = ifaceVitals @@ -129,6 +132,9 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic if prevResult != nil && prevResult.Vitals.BytesOut != 0 { elapsedTimeInSecs := float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 + if elapsedTimeInSecs == 0 { + elapsedTimeInSecs = float64(prevResult.Time.UnixMilli() / 1000) + } newResult.Vitals.KbpsOut = int64(float64((newResult.Vitals.BytesOut-prevResult.Vitals.BytesOut)*8/1000) / elapsedTimeInSecs) } From 0606e7dd6d298b48a68878249562520999ad7dc8 Mon Sep 17 00:00:00 2001 From: Rima Shah Date: Wed, 31 May 2023 13:46:52 -0600 Subject: [PATCH 5/8] Addressed review comments. --- traffic_monitor/cache/cache.go | 19 +++++++++++-------- traffic_monitor/health/cache.go | 21 +++++++++++---------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/traffic_monitor/cache/cache.go b/traffic_monitor/cache/cache.go index af5229999d..cd91302766 100644 --- a/traffic_monitor/cache/cache.go +++ b/traffic_monitor/cache/cache.go @@ -301,20 +301,23 @@ func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime t } stats, miscStats, err := decoder.Parse(result.ID, rdr, pollCtx) - if val, ok := miscStats["current_time_epoch_ms"]; ok { - valString := fmt.Sprintf("%s", val) - valInt, err := strconv.ParseInt(valString, 10, 64) - if err != nil { - log.Errorf("parse error '%v'", err) - } - result.Time = time.UnixMilli(valInt) - } if err != nil { log.Warnf("%s decode error '%v'", id, err) result.Error = err handler.resultChan <- result return } + if val, ok := miscStats["current_time_epoch_ms"]; ok { + valString := fmt.Sprintf("%s", val) + valInt, valErr := strconv.ParseInt(valString, 10, 64) + if err != nil { + log.Errorf("parse error '%v'", valErr) + result.Error = valErr + handler.resultChan <- result + return + } + result.Time = time.UnixMilli(valInt) + } if value, ok := miscStats[rfc.Via]; ok { result.ID = fmt.Sprintf("%v", value) } diff --git a/traffic_monitor/health/cache.go b/traffic_monitor/health/cache.go index 5e73f3933f..dee13e2633 100644 --- a/traffic_monitor/health/cache.go +++ b/traffic_monitor/health/cache.go @@ -60,6 +60,7 @@ func (t Threshold) String() string { // GetVitals Gets the vitals to decide health on in the right format func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.TrafficMonitorConfigMap) { + var elapsedTimeInSecs float64 if newResult.Error != nil { log.Errorf("cache_health.GetVitals() called with an errored Result!") return @@ -87,6 +88,14 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic return } + if prevResult != nil { + elapsedTimeInSecs := float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 + if elapsedTimeInSecs <= 0 { + newResult = prevResult + return + } + } + var monitoredInterfaces []tc.ServerInterfaceInfo for _, srvrIfaceInfo := range mc.TrafficServer[newResult.ID].Interfaces { if srvrIfaceInfo.Monitor { @@ -114,11 +123,7 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic MaxKbpsOut: iface.Speed * 1000, } - if prevResult != nil && prevResult.InterfaceVitals != nil && prevResult.InterfaceVitals[ifaceName].BytesOut != 0 { - elapsedTimeInSecs := float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 - if elapsedTimeInSecs == 0 { - elapsedTimeInSecs = float64(prevResult.Time.UnixMilli() / 1000) - } + if prevResult.InterfaceVitals != nil && prevResult.InterfaceVitals[ifaceName].BytesOut != 0 { ifaceVitals.KbpsOut = int64(float64((ifaceVitals.BytesOut-prevResult.InterfaceVitals[ifaceName].BytesOut)*8/1000) / elapsedTimeInSecs) } newResult.InterfaceVitals[ifaceName] = ifaceVitals @@ -130,11 +135,7 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic newResult.Vitals.MaxKbpsOut += iface.Speed * 1000 } - if prevResult != nil && prevResult.Vitals.BytesOut != 0 { - elapsedTimeInSecs := float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 - if elapsedTimeInSecs == 0 { - elapsedTimeInSecs = float64(prevResult.Time.UnixMilli() / 1000) - } + if prevResult.Vitals.BytesOut != 0 { newResult.Vitals.KbpsOut = int64(float64((newResult.Vitals.BytesOut-prevResult.Vitals.BytesOut)*8/1000) / elapsedTimeInSecs) } From c97325bc2e714a93cf04439324e280fb463ca7f3 Mon Sep 17 00:00:00 2001 From: Rima Shah Date: Wed, 31 May 2023 15:49:57 -0600 Subject: [PATCH 6/8] Added prevResult check back --- traffic_monitor/health/cache.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/traffic_monitor/health/cache.go b/traffic_monitor/health/cache.go index dee13e2633..4e79c9912b 100644 --- a/traffic_monitor/health/cache.go +++ b/traffic_monitor/health/cache.go @@ -89,7 +89,7 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic } if prevResult != nil { - elapsedTimeInSecs := float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 + elapsedTimeInSecs = float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 if elapsedTimeInSecs <= 0 { newResult = prevResult return @@ -123,7 +123,7 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic MaxKbpsOut: iface.Speed * 1000, } - if prevResult.InterfaceVitals != nil && prevResult.InterfaceVitals[ifaceName].BytesOut != 0 { + if prevResult != nil && prevResult.InterfaceVitals != nil && prevResult.InterfaceVitals[ifaceName].BytesOut != 0 { ifaceVitals.KbpsOut = int64(float64((ifaceVitals.BytesOut-prevResult.InterfaceVitals[ifaceName].BytesOut)*8/1000) / elapsedTimeInSecs) } newResult.InterfaceVitals[ifaceName] = ifaceVitals @@ -135,7 +135,7 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic newResult.Vitals.MaxKbpsOut += iface.Speed * 1000 } - if prevResult.Vitals.BytesOut != 0 { + if prevResult != nil && prevResult.Vitals.BytesOut != 0 { newResult.Vitals.KbpsOut = int64(float64((newResult.Vitals.BytesOut-prevResult.Vitals.BytesOut)*8/1000) / elapsedTimeInSecs) } From 59fd913e9cf22be41525d45c9d16e4156a6595bf Mon Sep 17 00:00:00 2001 From: Rima Shah Date: Wed, 31 May 2023 16:37:56 -0600 Subject: [PATCH 7/8] pointer assignment for structure. --- traffic_monitor/health/cache.go | 2 +- traffic_monitor/health/cache_test.go | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/traffic_monitor/health/cache.go b/traffic_monitor/health/cache.go index 4e79c9912b..53f932285c 100644 --- a/traffic_monitor/health/cache.go +++ b/traffic_monitor/health/cache.go @@ -91,7 +91,7 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *tc.Traffic if prevResult != nil { elapsedTimeInSecs = float64(newResult.Time.UnixMilli()-prevResult.Time.UnixMilli()) / 1000 if elapsedTimeInSecs <= 0 { - newResult = prevResult + *newResult = *prevResult return } } diff --git a/traffic_monitor/health/cache_test.go b/traffic_monitor/health/cache_test.go index 847d82931a..3b3c151eb5 100644 --- a/traffic_monitor/health/cache_test.go +++ b/traffic_monitor/health/cache_test.go @@ -310,6 +310,14 @@ func TestDualHomingMonitoredInterfacesGetVitals(t *testing.T) { if firstResult.Vitals != expectedFirstVitals { t.Errorf("Vitals do not match expected output. expected: %v actual: %v:", expectedFirstVitals, firstResult.Vitals) } + + //Test if elapsedTimeInSecs == 0 + secondResult.Time = firstResult.Time + GetVitals(&secondResult, &firstResult, &tmcm) + if firstResult.Statistics.Interfaces["bond0"] != secondResult.Statistics.Interfaces["bond0"] { + t.Errorf("Load avg statistics do not match. expected: %v, got: %v", firstResult.Statistics.Interfaces["bond0"], secondResult.Statistics.Interfaces["bond0"]) + } + } func TestCalcAvailabilityThresholds(t *testing.T) { From 1061f9ca4c1f8a457fc7f32a380b6443bf12505c Mon Sep 17 00:00:00 2001 From: Rima Shah Date: Wed, 31 May 2023 16:48:09 -0600 Subject: [PATCH 8/8] updated cache unite test with another check. --- traffic_monitor/cache/cache.go | 2 +- traffic_monitor/cache/cache_test.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/traffic_monitor/cache/cache.go b/traffic_monitor/cache/cache.go index cd91302766..4d443c4088 100644 --- a/traffic_monitor/cache/cache.go +++ b/traffic_monitor/cache/cache.go @@ -310,7 +310,7 @@ func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime t if val, ok := miscStats["current_time_epoch_ms"]; ok { valString := fmt.Sprintf("%s", val) valInt, valErr := strconv.ParseInt(valString, 10, 64) - if err != nil { + if valErr != nil { log.Errorf("parse error '%v'", valErr) result.Error = valErr handler.resultChan <- result diff --git a/traffic_monitor/cache/cache_test.go b/traffic_monitor/cache/cache_test.go index 588e14d1a5..077592ffca 100644 --- a/traffic_monitor/cache/cache_test.go +++ b/traffic_monitor/cache/cache_test.go @@ -129,5 +129,7 @@ func TestParseAndDecode(t *testing.T) { if valString != "1684784878894" { t.Errorf("unable to read `current_time_epoch_ms`") } + } else { + t.Errorf("current_time_epoch_ms field was not found in the json file") } }