diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index f0ede0c40ac..f67ed60bfee 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -21,6 +21,7 @@ import ( "log" "time" + "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "go.opencensus.io/stats/view" "go.uber.org/zap" @@ -29,6 +30,7 @@ import ( "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection" + "knative.dev/pkg/kmeta" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/signals" @@ -54,10 +56,11 @@ const ( ) type envConfig struct { - Broker string `envconfig:"BROKER" required:"true"` - Namespace string `envconfig:"NAMESPACE" required:"true"` - PodName string `split_words:"true" required:"true"` - ContainerName string `split_words:"true" required:"true"` + Broker string `envconfig:"BROKER" required:"true"` + Namespace string `envconfig:"NAMESPACE" required:"true"` + // TODO: change this environment variable to something like "PodGroupName". + PodName string `envconfig:"POD_NAME" required:"true"` + ContainerName string `envconfig:"CONTAINER_NAME" required:"true"` } func main() { @@ -124,7 +127,7 @@ func main() { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } - reporter := filter.NewStatsReporter(env.PodName, env.ContainerName) + reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index ea969a1cbee..44a52edf282 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -27,6 +27,7 @@ import ( // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" cloudevents "github.com/cloudevents/sdk-go" + "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "go.opencensus.io/stats/view" "go.uber.org/zap" @@ -40,6 +41,7 @@ import ( "knative.dev/pkg/controller" "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/kmeta" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/signals" @@ -67,11 +69,12 @@ const ( ) type envConfig struct { - Broker string `envconfig:"BROKER" required:"true"` - Channel string `envconfig:"CHANNEL" required:"true"` - Namespace string `envconfig:"NAMESPACE" required:"true"` - PodName string `split_words:"true" required:"true"` - ContainerName string `split_words:"true" required:"true"` + Broker string `envconfig:"BROKER" required:"true"` + Channel string `envconfig:"CHANNEL" required:"true"` + Namespace string `envconfig:"NAMESPACE" required:"true"` + // TODO: change this environment variable to something like "PodGroupName". + PodName string `envconfig:"POD_NAME" required:"true"` + ContainerName string `envconfig:"CONTAINER_NAME" required:"true"` } func main() { @@ -163,7 +166,7 @@ func main() { logger.Fatal("Unable to create CE client", zap.Error(err)) } - reporter := ingress.NewStatsReporter(env.PodName, env.ContainerName) + reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) h := &ingress.Handler{ Logger: logger, diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go index 9188d358656..4f4e73b0859 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/broker/filter/stats_reporter.go @@ -96,15 +96,15 @@ var emptyContext = context.Background() // reporter holds cached metric objects to report filter metrics. type reporter struct { - pod string - container string + container string + uniqueName string } // NewStatsReporter creates a reporter that collects and reports filter metrics. -func NewStatsReporter(pod, container string) StatsReporter { +func NewStatsReporter(container, uniqueName string) StatsReporter { return &reporter{ - pod: pod, - container: container, + container: container, + uniqueName: uniqueName, } } @@ -115,19 +115,19 @@ func register() { Description: eventCountM.Description(), Measure: eventCountM, Aggregation: view.Count(), - TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.PodTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, }, &view.View{ Description: dispatchTimeInMsecM.Description(), Measure: dispatchTimeInMsecM, Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 - TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.PodTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, }, &view.View{ Description: processingTimeInMsecM.Description(), Measure: processingTimeInMsecM, Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 - TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, broker.PodTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, broker.UniqueTagKey, broker.ContainerTagKey}, }, ) if err != nil { @@ -176,8 +176,8 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C // Note that filterType and filterSource can be empty strings, so they need a special treatment. ctx, err := tag.New( emptyContext, - tag.Insert(broker.PodTagKey, r.pod), tag.Insert(broker.ContainerTagKey, r.container), + tag.Insert(broker.UniqueTagKey, r.uniqueName), tag.Insert(namespaceKey, args.ns), tag.Insert(triggerKey, args.trigger), tag.Insert(brokerKey, args.broker), diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index 996c13618ed..abb95384695 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -35,7 +35,7 @@ func TestStatsReporter(t *testing.T) { filterType: "testeventtype", } - r := NewStatsReporter("testpod", "testcontainer") + r := NewStatsReporter("testcontainer", "testpod") wantTags := map[string]string{ metricskey.LabelNamespaceName: "testns", @@ -43,7 +43,7 @@ func TestStatsReporter(t *testing.T) { metricskey.LabelBrokerName: "testbroker", metricskey.LabelFilterType: "testeventtype", broker.LabelContainerName: "testcontainer", - broker.LabelPodName: "testpod", + broker.LabelUniqueName: "testpod", } wantAllTags := map[string]string{} @@ -91,7 +91,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) { filterType: "", } - r := NewStatsReporter("testpod", "testcontainer") + r := NewStatsReporter("testcontainer", "testpod") wantTags := map[string]string{ metricskey.LabelNamespaceName: "testns", @@ -101,7 +101,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) { metricskey.LabelResponseCode: "202", metricskey.LabelResponseCodeClass: "2xx", broker.LabelContainerName: "testcontainer", - broker.LabelPodName: "testpod", + broker.LabelUniqueName: "testpod", } // test ReportEventCount diff --git a/pkg/broker/ingress/stats_reporter.go b/pkg/broker/ingress/stats_reporter.go index 0c2807f976e..4d45cea40f0 100644 --- a/pkg/broker/ingress/stats_reporter.go +++ b/pkg/broker/ingress/stats_reporter.go @@ -80,15 +80,15 @@ var emptyContext = context.Background() // Reporter holds cached metric objects to report ingress metrics. type reporter struct { - pod string - container string + container string + uniqueName string } // NewStatsReporter creates a reporter that collects and reports ingress metrics. -func NewStatsReporter(pod, container string) StatsReporter { +func NewStatsReporter(container, uniqueName string) StatsReporter { return &reporter{ - pod: pod, - container: container, + container: container, + uniqueName: uniqueName, } } @@ -99,8 +99,8 @@ func register() { eventTypeKey, responseCodeKey, responseCodeClassKey, - broker.PodTagKey, - broker.ContainerTagKey} + broker.ContainerTagKey, + broker.UniqueTagKey} // Create view to see our measurements. err := view.Register( @@ -146,8 +146,8 @@ func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Context, error) { return tag.New( emptyContext, - tag.Insert(broker.PodTagKey, r.pod), tag.Insert(broker.ContainerTagKey, r.container), + tag.Insert(broker.UniqueTagKey, r.uniqueName), tag.Insert(namespaceKey, args.ns), tag.Insert(brokerKey, args.broker), tag.Insert(eventTypeKey, args.eventType), diff --git a/pkg/broker/ingress/stats_reporter_test.go b/pkg/broker/ingress/stats_reporter_test.go index e78b3023278..beb4986672a 100644 --- a/pkg/broker/ingress/stats_reporter_test.go +++ b/pkg/broker/ingress/stats_reporter_test.go @@ -35,7 +35,7 @@ func TestStatsReporter(t *testing.T) { eventType: "testeventtype", } - r := NewStatsReporter("testpod", "testcontainer") + r := NewStatsReporter("testcontainer", "testpod") wantTags := map[string]string{ metricskey.LabelNamespaceName: "testns", @@ -43,7 +43,7 @@ func TestStatsReporter(t *testing.T) { metricskey.LabelEventType: "testeventtype", metricskey.LabelResponseCode: "202", metricskey.LabelResponseCodeClass: "2xx", - broker.LabelPodName: "testpod", + broker.LabelUniqueName: "testpod", broker.LabelContainerName: "testcontainer", } diff --git a/pkg/broker/metrics.go b/pkg/broker/metrics.go index fafc5bc28b4..a2056c74a66 100644 --- a/pkg/broker/metrics.go +++ b/pkg/broker/metrics.go @@ -25,14 +25,14 @@ const ( // The format is an RFC3339 time in string format. For example: 2019-08-26T23:38:17.834384404Z. EventArrivalTime = "knativearrivaltime" + // LabelUniqueName is the label for the unique name per stats_reporter instance. + LabelUniqueName = "unique_name" + // LabelContainerName is the label for the immutable name of the container. LabelContainerName = "container_name" - - // LabelPodName is the label for the immutable name of the pod. - LabelPodName = "pod_name" ) var ( - PodTagKey = tag.MustNewKey(LabelPodName) ContainerTagKey = tag.MustNewKey(LabelContainerName) + UniqueTagKey = tag.MustNewKey(LabelUniqueName) )