diff --git a/cmd/metrics/event_defs.go b/cmd/metrics/event_defs.go index aff1bca6..10ae00fb 100644 --- a/cmd/metrics/event_defs.go +++ b/cmd/metrics/event_defs.go @@ -12,8 +12,6 @@ import ( "log/slog" "os" "path/filepath" - "regexp" - "slices" "strings" mapset "github.com/deckarep/golang-set/v2" @@ -73,9 +71,7 @@ func LoadEventGroups(eventDefinitionOverridePath string, metadata Metadata) (gro if event, err = parseEventDefinition(line[:len(line)-1]); err != nil { return } - // abbreviate the event name to shorten the eventual perf stat command line - event.Name = abbreviateEventName(event.Name) - event.Raw = abbreviateEventName(event.Raw) + // check if the event is collectable if isCollectableEvent(event, metadata) { group = append(group, event) } else { @@ -95,8 +91,6 @@ func LoadEventGroups(eventDefinitionOverridePath string, metadata Metadata) (gro return } uncollectableEvents = uncollectable.ToSlice() - // expand uncore groups for all uncore devices - groups, err = expandUncoreGroups(groups, metadata) if uncollectable.Cardinality() != 0 { slog.Debug("Events not collectable on target", slog.String("events", uncollectable.String())) @@ -104,38 +98,6 @@ func LoadEventGroups(eventDefinitionOverridePath string, metadata Metadata) (gro return } -// abbreviateEventName replaces long event names with abbreviations to reduce the length of the perf command. -// focus is on uncore events because they are repeated for each uncore device -func abbreviateEventName(event string) string { - // Abbreviations must be unique and in order. And, if replacing UNC_*, the abbreviation must begin with "UNC" because this is how we identify uncore events when collapsing them. - var abbreviations = [][]string{ - {"UNC_CHA_TOR_INSERTS", "UNCCTI"}, - {"UNC_CHA_TOR_OCCUPANCY", "UNCCTO"}, - {"UNC_CHA_CLOCKTICKS", "UNCCCT"}, - {"UNC_M_CAS_COUNT_SCH", "UNCMCC"}, - {"IA_MISS_DRD_REMOTE", "IMDR"}, - {"IA_MISS_DRD_LOCAL", "IMDL"}, - {"IA_MISS_LLCPREFDATA", "IMLP"}, - {"IA_MISS_LLCPREFRFO", "IMLR"}, - {"IA_MISS_DRD_PREF_LOCAL", "IMDPL"}, - {"IA_MISS_DRD_PREF_REMOTE", "IMDRP"}, - {"IA_MISS_CRD_PREF", "IMCP"}, - {"IA_MISS_RFO_PREF", "IMRP"}, - {"IA_MISS_RFO", "IMRF"}, - {"IA_MISS_CRD", "IMC"}, - {"IA_MISS_DRD", "IMD"}, - {"IO_PCIRDCUR", "IPCI"}, - {"IO_ITOMCACHENEAR", "IITN"}, - {"IO_ITOM", "IITO"}, - {"IMD_OPT", "IMDO"}, - } - // if an abbreviation key is found in the event, replace the matching portion of the event with the abbreviation - for _, abbr := range abbreviations { - event = strings.Replace(event, abbr[0], abbr[1], -1) - } - return event -} - // isCollectableEvent confirms if given event can be collected on the platform func isCollectableEvent(event EventDefinition, metadata Metadata) bool { // fixed-counter TMA @@ -250,61 +212,3 @@ func parseEventDefinition(line string) (eventDef EventDefinition, err error) { } return } - -// expandUncoreGroup expands a perf event group into a list of groups where each group is -// associated with an uncore device -func expandUncoreGroup(group GroupDefinition, ids []int, re *regexp.Regexp, vendor string) (groups []GroupDefinition, err error) { - for _, deviceID := range ids { - var newGroup GroupDefinition - for _, event := range group { - match := re.FindStringSubmatch(event.Raw) - if len(match) == 0 { - err = fmt.Errorf("unexpected raw event format: %s", event.Raw) - return - } - var newEvent EventDefinition - if vendor == "AuthenticAMD" { - newEvent.Name = match[4] - newEvent.Raw = fmt.Sprintf("amd_%s/event=%s,umask=%s,name='%s'/", match[1], match[2], match[3], newEvent.Name) - } else { - newEvent.Name = fmt.Sprintf("%s.%d", match[4], deviceID) - newEvent.Raw = fmt.Sprintf("uncore_%s_%d/event=%s,umask=%s,name='%s'/", match[1], deviceID, match[2], match[3], newEvent.Name) - } - newEvent.Device = event.Device - newGroup = append(newGroup, newEvent) - } - groups = append(groups, newGroup) - } - return -} - -// expandUncoreGroups expands groups with uncore events to include events for all uncore devices -// assumes that uncore device events are in their own groups, not mixed with other device types -func expandUncoreGroups(groups []GroupDefinition, metadata Metadata) (expandedGroups []GroupDefinition, err error) { - // example 1: cha/event=0x35,umask=0xc80ffe01,name='UNC_CHA_TOR_INSERTS.IA_MISS_CRD'/, - // expand to: uncore_cha_0/event=0x35,umask=0xc80ffe01,name='UNC_CHA_TOR_INSERTS.IA_MISS_CRD.0'/, - // example 2: cha/event=0x36,umask=0x21,config1=0x4043300000000,name='UNC_CHA_TOR_OCCUPANCY.IA_MISS.0x40433'/ - // expand to: uncore_cha_0/event=0x36,umask=0x21,config1=0x4043300000000,name='UNC_CHA_TOR_OCCUPANCY.IA_MISS.0x40433'/ - re := regexp.MustCompile(`(\w+)/event=(0x[0-9,a-f,A-F]+),umask=(0x[0-9,a-f,A-F]+.*),name='(.*)'`) - var deviceTypes []string - for deviceType := range metadata.UncoreDeviceIDs { - deviceTypes = append(deviceTypes, deviceType) - } - for _, group := range groups { - device := group[0].Device - if slices.Contains(deviceTypes, device) { - var newGroups []GroupDefinition - if len(metadata.UncoreDeviceIDs[device]) == 0 { - slog.Warn("No uncore devices found", slog.String("type", device)) - continue - } - if newGroups, err = expandUncoreGroup(group, metadata.UncoreDeviceIDs[device], re, metadata.Vendor); err != nil { - return - } - expandedGroups = append(expandedGroups, newGroups...) - } else { - expandedGroups = append(expandedGroups, group) - } - } - return -} diff --git a/cmd/metrics/event_frame.go b/cmd/metrics/event_frame.go index 5becff11..2db95cfc 100644 --- a/cmd/metrics/event_frame.go +++ b/cmd/metrics/event_frame.go @@ -65,16 +65,32 @@ type Event struct { func GetEventFrames(rawEvents [][]byte, eventGroupDefinitions []GroupDefinition, scope string, granularity string, metadata Metadata) (eventFrames []EventFrame, err error) { // parse raw events into list of Event var allEvents []Event - if allEvents, err = parseEvents(rawEvents, eventGroupDefinitions); err != nil { + if allEvents, err = parseEvents(rawEvents); err != nil { return } - // coalesce events to one or more lists based on scope and granularity - var coalescedEvents [][]Event - if coalescedEvents, err = coalesceEvents(allEvents, scope, granularity, metadata); err != nil { + + // bucket events into one or more lists based on scope and granularity + var bucketedEvents [][]Event + if bucketedEvents, err = bucketEvents(allEvents, scope, granularity, metadata); err != nil { return } + + // aggregate uncore events + var aggregatedEvents [][]Event + if aggregatedEvents, err = aggregateUncoreEvents(bucketedEvents); err != nil { + return nil, fmt.Errorf("failed to aggregate uncore events: %v", err) + } + + // assign events to groups based on event group definitions + for _, events := range aggregatedEvents { + err = assignEventsToGroups(events, eventGroupDefinitions) + if err != nil { + return + } + } + // create one EventFrame per list of Events - for _, events := range coalescedEvents { + for _, events := range aggregatedEvents { // organize events into groups group := EventGroup{EventValues: make(map[string]float64)} var lastGroupID int @@ -104,26 +120,20 @@ func GetEventFrames(rawEvents [][]byte, eventGroupDefinitions []GroupDefinition, } // add the last group eventFrame.EventGroups = append(eventFrame.EventGroups, group) - // TODO: can we collapse uncore groups as we're parsing (above)? - if eventFrame, err = collapseUncoreGroupsInFrame(eventFrame); err != nil { - return - } eventFrames = append(eventFrames, eventFrame) } return } // parseEvents parses the raw event data into a list of Event -func parseEvents(rawEvents [][]byte, eventGroupDefinitions []GroupDefinition) ([]Event, error) { +func parseEvents(rawEvents [][]byte) ([]Event, error) { events := make([]Event, 0, len(rawEvents)) - groupIdx := 0 - eventIdx := -1 - previousEvent := "" var eventsNotCounted []string var eventsNotSupported []string for _, rawEvent := range rawEvents { - event, err := parseEventJSON(rawEvent) // nosemgrep - if err != nil { + var event Event + if err := json.Unmarshal(rawEvent, &event); err != nil { + err = fmt.Errorf("unrecognized event format: %w", err) slog.Error(err.Error(), slog.String("event", string(rawEvent))) return nil, err } @@ -136,24 +146,13 @@ func parseEvents(rawEvents [][]byte, eventGroupDefinitions []GroupDefinition) ([ slog.Debug("event not supported", slog.String("event", string(rawEvent))) eventsNotSupported = append(eventsNotSupported, event.Event) event.Value = math.NaN() - } - if event.Event != previousEvent { - eventIdx++ - previousEvent = event.Event - } - if eventIdx == len(eventGroupDefinitions[groupIdx]) { // last event in group - groupIdx++ - if groupIdx == len(eventGroupDefinitions) { - // if in cgroup scope, we receive one set of events for each cgroup - if flagScope == scopeCgroup { - groupIdx = 0 - } else { - return nil, fmt.Errorf("event group definitions not aligning with raw events") - } + default: + var err error + if event.Value, err = strconv.ParseFloat(event.CounterValue, 64); err != nil { + slog.Error("failed to parse event value", slog.String("event", event.Event), slog.String("value", event.CounterValue)) + event.Value = math.NaN() } - eventIdx = 0 } - event.Group = groupIdx events = append(events, event) } if len(eventsNotCounted) > 0 { @@ -165,13 +164,79 @@ func parseEvents(rawEvents [][]byte, eventGroupDefinitions []GroupDefinition) ([ return events, nil } -// coalesceEvents separates the events into a number of event lists by granularity and scope -func coalesceEvents(allEvents []Event, scope string, granularity string, metadata Metadata) (coalescedEvents [][]Event, err error) { +// aggregateUncoreEvents sums the values of uncore events with the same name and interval +// and removes duplicates, leaving only one event per name and interval. +func aggregateUncoreEvents(bucketedEvents [][]Event) ([][]Event, error) { + if flagGranularity != granularitySocket { + return bucketedEvents, nil // disaggregated uncore events are only present in socket granularity + } + for bucketIdx, events := range bucketedEvents { + if len(events) == 0 { + continue + } + // Use a map to track unique uncore events by (event_name, interval) + uncoreMap := make(map[string]int) // key: "event_name:interval" -> index in filtered slice + var filteredEvents []Event + for _, event := range events { + if strings.HasPrefix(event.Event, "UNC") { + // Create unique key for this uncore event + key := fmt.Sprintf("%s:%.9f", event.Event, event.Interval) + // Check if this uncore event is already in the map + if existingIdx, exists := uncoreMap[key]; exists { + // Aggregate with existing event + filteredEvents[existingIdx].Value += event.Value + } else { + // Add new unique uncore event + uncoreMap[key] = len(filteredEvents) + filteredEvents = append(filteredEvents, event) + } + } else { + // Keep non-uncore events as-is + filteredEvents = append(filteredEvents, event) + } + } + // Update the original slice + bucketedEvents[bucketIdx] = filteredEvents + } + return bucketedEvents, nil +} + +// assignEventsToGroups assigns each event to a group based on the event group definitions. +// It modifies the events in place by setting the Group field of each event. +func assignEventsToGroups(events []Event, eventGroupDefinitions []GroupDefinition) error { + if len(events) == 0 { + return fmt.Errorf("no events to assign to groups") + } + if len(eventGroupDefinitions) == 0 { + return fmt.Errorf("no event group definitions provided") + } + groupIdx := 0 + eventIdx := -1 + previousEvent := "" + for i := range events { + if events[i].Event != previousEvent { + eventIdx++ + previousEvent = events[i].Event + } + if eventIdx == len(eventGroupDefinitions[groupIdx]) { // last event in group + groupIdx++ + if groupIdx == len(eventGroupDefinitions) { + return fmt.Errorf("event group definitions not aligning with raw events") + } + eventIdx = 0 + } + events[i].Group = groupIdx + } + return nil +} + +// bucketEvents separates the events into a number of event lists by granularity and scope +func bucketEvents(allEvents []Event, scope string, granularity string, metadata Metadata) (bucketedEvents [][]Event, err error) { switch scope { case scopeSystem: switch granularity { case granularitySystem: - coalescedEvents = append(coalescedEvents, allEvents) + bucketedEvents = append(bucketedEvents, allEvents) return case granularitySocket: // create one list of Events per Socket @@ -210,7 +275,7 @@ func coalesceEvents(allEvents []Event, scope string, granularity string, metadat return } } - coalescedEvents = append(coalescedEvents, newEvents...) + bucketedEvents = append(bucketedEvents, newEvents...) return case granularityCPU: // create one list of Events per CPU @@ -250,36 +315,37 @@ func coalesceEvents(allEvents []Event, scope string, granularity string, metadat for idx, cpuID := range cpuIDs { cpuMap[cpuID] = idx } - // note: if some cores have been off-lined, this may cause an issue because 'perf' seems - // to still report events for those cores + + // create a new list of events, one for each CPU newEvents := make([][]Event, numCPUs) for i := range numCPUs { newEvents[i] = make([]Event, 0, len(allEvents)/numCPUs) } + + // iterate over all events and place them into the newEvents list for _, event := range allEvents { var cpu int if cpu, err = strconv.Atoi(event.CPU); err != nil { return } - // handle case where perf returns events for off-lined cores - if cpu > len(newEvents)-1 { - cpusToAdd := len(newEvents) + 1 - cpu - for range cpusToAdd { - newEvents = append(newEvents, make([]Event, 0, len(allEvents)/numCPUs)) - } + // check if the CPU is in the cpuMap + if _, ok := cpuMap[cpu]; !ok { + slog.Debug("cpu not found in cpu map, skipping event", slog.String("cpu", event.CPU), slog.String("event", event.Event)) + continue } - // place the event for the current CPU into the newEvents list // cpuMap ensures that events are placed in a valid index newEvents[cpuMap[cpu]] = append(newEvents[cpuMap[cpu]], event) } - coalescedEvents = append(coalescedEvents, newEvents...) + + // now we have a list of events for each CPU, we need to aggregate them + bucketedEvents = append(bucketedEvents, newEvents...) default: err = fmt.Errorf("unsupported granularity: %s", granularity) return } case scopeProcess: - coalescedEvents = append(coalescedEvents, allEvents) + bucketedEvents = append(bucketedEvents, allEvents) return case scopeCgroup: // expand events list to one list per cgroup @@ -294,7 +360,7 @@ func coalesceEvents(allEvents []Event, scope string, granularity string, metadat } allCgroupEvents[cgroupIdx] = append(allCgroupEvents[cgroupIdx], event) } - coalescedEvents = append(coalescedEvents, allCgroupEvents...) + bucketedEvents = append(bucketedEvents, allCgroupEvents...) default: err = fmt.Errorf("unsupported scope: %s", scope) return @@ -302,130 +368,6 @@ func coalesceEvents(allEvents []Event, scope string, granularity string, metadat return } -// collapseUncoreGroupsInFrame merges repeated (per-device) uncore groups into a single -// group by summing the values for events that only differ by device ID. -// -// uncore events are received in repeated perf groups like this: -// group: -// 5.005032332,49,,UNC_CHA_TOR_INSERTS.IA_MISS_CRD.0,2806917160,25.00,, -// 5.005032332,2720,,UNC_CHA_TOR_INSERTS.IA_MISS_DRD_REMOTE.0,2806917160,25.00,, -// 5.005032332,1061494,,UNC_CHA_TOR_OCCUPANCY.IA_MISS_DRD_REMOTE.0,2806917160,25.00,, -// group: -// 5.005032332,49,,UNC_CHA_TOR_INSERTS.IA_MISS_CRD.1,2806585867,25.00,, -// 5.005032332,2990,,UNC_CHA_TOR_INSERTS.IA_MISS_DRD_REMOTE.1,2806585867,25.00,, -// 5.005032332,1200063,,UNC_CHA_TOR_OCCUPANCY.IA_MISS_DRD_REMOTE.1,2806585867,25.00,, -// -// For the example above, we will have this: -// 5.005032332,98,,UNC_CHA_TOR_INSERTS.IA_MISS_CRD,2806585867,25.00,, -// 5.005032332,5710,,UNC_CHA_TOR_INSERTS.IA_MISS_DRD_REMOTE,2806585867,25.00,, -// 5.005032332,2261557,,UNC_CHA_TOR_OCCUPANCY.IA_MISS_DRD_REMOTE,2806585867,25.00,, -// Note: uncore event names start with "UNC" -// Note: we assume that uncore events are not mixed into groups that have other event types, e.g., cpu events -func collapseUncoreGroupsInFrame(inFrame EventFrame) (outFrame EventFrame, err error) { - outFrame = inFrame - outFrame.EventGroups = []EventGroup{} - var idxUncoreMatches []int - for inGroupIdx, inGroup := range inFrame.EventGroups { - // skip groups that have been collapsed - if slices.Contains(idxUncoreMatches, inGroupIdx) { - continue - } - idxUncoreMatches = []int{} - foundUncore := false - for eventName := range inGroup.EventValues { - // only check the first entry - if strings.HasPrefix(eventName, "UNC") { - foundUncore = true - } - break - } - if foundUncore { - // we need to know how many of the following groups (if any) match the current group - // so they can be merged together into a single group - for i := inGroupIdx + 1; i < len(inFrame.EventGroups); i++ { - if isMatchingGroup(inGroup, inFrame.EventGroups[i]) { - // keep track of the groups that match so we can skip processing them since - // they will be merged into a single group - idxUncoreMatches = append(idxUncoreMatches, i) - } else { - break - } - } - var outGroup EventGroup - if outGroup, err = collapseUncoreGroups(inFrame.EventGroups, inGroupIdx, len(idxUncoreMatches)); err != nil { - return - } - outFrame.EventGroups = append(outFrame.EventGroups, outGroup) - } else { - outFrame.EventGroups = append(outFrame.EventGroups, inGroup) - } - } - return -} - -// isMatchingGroup - groups are considered matching if they include the same event names (ignoring .ID suffix) -func isMatchingGroup(groupA, groupB EventGroup) bool { - if len(groupA.EventValues) != len(groupB.EventValues) { - return false - } - aNames := make([]string, 0, len(groupA.EventValues)) - bNames := make([]string, 0, len(groupB.EventValues)) - for eventAName := range groupA.EventValues { - parts := strings.Split(eventAName, ".") - newName := strings.Join(parts[:len(parts)-1], ".") - aNames = append(aNames, newName) - } - for eventBName := range groupB.EventValues { - parts := strings.Split(eventBName, ".") - newName := strings.Join(parts[:len(parts)-1], ".") - bNames = append(bNames, newName) - } - slices.Sort(aNames) - slices.Sort(bNames) - for nameIdx, name := range aNames { - if name != bNames[nameIdx] { - return false - } - } - return true -} - -// collapseUncoreGroups collapses a list of groups into a single group -func collapseUncoreGroups(inGroups []EventGroup, firstIdx int, count int) (outGroup EventGroup, err error) { - outGroup.GroupID = inGroups[firstIdx].GroupID - outGroup.Percentage = inGroups[firstIdx].Percentage - outGroup.EventValues = make(map[string]float64) - for i := firstIdx; i <= firstIdx+count; i++ { - for name, value := range inGroups[i].EventValues { - parts := strings.Split(name, ".") - newName := strings.Join(parts[:len(parts)-1], ".") - if _, ok := outGroup.EventValues[newName]; !ok { - outGroup.EventValues[newName] = 0 - } - outGroup.EventValues[newName] += value - } - } - return -} - -// parseEventJSON parses JSON formatted event into struct -// example: {"interval" : 5.005113019, "cpu": "0", "counter-value" : "22901873.000000", "unit" : "", "cgroup" : "...1cb2de.scope", "event" : "L1D.REPLACEMENT", "event-runtime" : 80081151765, "pcnt-running" : 6.00, "metric-value" : 0.000000, "metric-unit" : "(null)"} -func parseEventJSON(rawEvent []byte) (Event, error) { - var event Event - if err := json.Unmarshal(rawEvent, &event); err != nil { - err = fmt.Errorf("unrecognized event format") - return event, err - } - if !strings.Contains(event.CounterValue, "not counted") && !strings.Contains(event.CounterValue, "not supported") { - var err error - if event.Value, err = strconv.ParseFloat(event.CounterValue, 64); err != nil { - slog.Error("failed to parse event value", slog.String("event", event.Event), slog.String("value", event.CounterValue)) - event.Value = math.NaN() - } - } - return event, nil -} - // extractInterval parses the interval value from a JSON perf event line // Returns the interval as a float64, or -1 if parsing fails func extractInterval(line []byte) float64 { diff --git a/cmd/metrics/metric.go b/cmd/metrics/metric.go index ff817cd2..0d35b57d 100644 --- a/cmd/metrics/metric.go +++ b/cmd/metrics/metric.go @@ -206,15 +206,19 @@ func getExpressionVariableValues(metric MetricDefinition, frame EventFrame, prev err = fmt.Errorf("variable value set to -2 (shouldn't happen): %s", variableName) return } - // set the variable value to the event value divided by the perf collection time to normalize the value to 1 second - if len(frame.EventGroups) <= metric.Variables[variableName] { - err = fmt.Errorf("event groups have changed") + if metric.Variables[variableName] >= len(frame.EventGroups) { + err = fmt.Errorf("metric variable's assigned group index is out of bounds: %s", variableName) return } + if _, ok := frame.EventGroups[metric.Variables[variableName]].EventValues[variableName]; !ok { + err = fmt.Errorf("metric variable's assigned group does not have the variable name: %s", variableName) + return + } + // normalize the value to 1 second interval, i.e., events per second variables[variableName] = frame.EventGroups[metric.Variables[variableName]].EventValues[variableName] / (frame.Timestamp - previousTimestamp) - // adjust cstate_core/c6-residency value if hyperthreading is enabled + // adjust cstate_core/c6-residency value if hyperthreading is enabled and the metric is not at CPU granularity // why here? so we don't have to change the perfmon metric formula - if metadata.ThreadsPerCore > 1 && variableName == "cstate_core/c6-residency/" { + if variableName == "cstate_core/c6-residency/" && flagGranularity != granularityCPU && metadata.ThreadsPerCore > 1 { variables[variableName] = variables[variableName].(float64) * float64(metadata.ThreadsPerCore) } } diff --git a/cmd/metrics/metric_defs.go b/cmd/metrics/metric_defs.go index c3c8ba93..ce5be610 100644 --- a/cmd/metrics/metric_defs.go +++ b/cmd/metrics/metric_defs.go @@ -109,8 +109,6 @@ func ConfigureMetrics(loadedMetrics []MetricDefinition, uncollectableEvents []st reConstantInt := regexp.MustCompile(`\[(\d+)\]`) for metricIdx := range loadedMetrics { tmpMetric := loadedMetrics[metricIdx] - // abbreviate event names in metric expressions to match abbreviations used in uncollectableEvents - tmpMetric.Expression = abbreviateEventName(tmpMetric.Expression) // skip metrics that use uncollectable events foundUncollectable := false for _, uncollectableEvent := range uncollectableEvents {