Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"log"
"time"

"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
Expand All @@ -29,6 +30,7 @@ import (
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
Expand All @@ -54,10 +56,11 @@ const (
)

type envConfig struct {
Broker string `envconfig:"BROKER" required:"true"`
Namespace string `envconfig:"NAMESPACE" required:"true"`
PodName string `split_words:"true" required:"true"`
ContainerName string `split_words:"true" required:"true"`
Broker string `envconfig:"BROKER" required:"true"`
Namespace string `envconfig:"NAMESPACE" required:"true"`
// TODO: change this environment variable to something like "PodGroupName".
PodName string `envconfig:"POD_NAME" required:"true"`
ContainerName string `envconfig:"CONTAINER_NAME" required:"true"`
}

func main() {
Expand Down Expand Up @@ -124,7 +127,7 @@ func main() {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

reporter := filter.NewStatsReporter(env.PodName, env.ContainerName)
reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
Expand Down
15 changes: 9 additions & 6 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
Expand All @@ -40,6 +41,7 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
Expand Down Expand Up @@ -67,11 +69,12 @@ const (
)

type envConfig struct {
Broker string `envconfig:"BROKER" required:"true"`
Channel string `envconfig:"CHANNEL" required:"true"`
Namespace string `envconfig:"NAMESPACE" required:"true"`
PodName string `split_words:"true" required:"true"`
ContainerName string `split_words:"true" required:"true"`
Broker string `envconfig:"BROKER" required:"true"`
Channel string `envconfig:"CHANNEL" required:"true"`
Namespace string `envconfig:"NAMESPACE" required:"true"`
// TODO: change this environment variable to something like "PodGroupName".
PodName string `envconfig:"POD_NAME" required:"true"`
ContainerName string `envconfig:"CONTAINER_NAME" required:"true"`
}

func main() {
Expand Down Expand Up @@ -163,7 +166,7 @@ func main() {
logger.Fatal("Unable to create CE client", zap.Error(err))
}

reporter := ingress.NewStatsReporter(env.PodName, env.ContainerName)
reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

h := &ingress.Handler{
Logger: logger,
Expand Down
18 changes: 9 additions & 9 deletions pkg/broker/filter/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ var emptyContext = context.Background()

// reporter holds cached metric objects to report filter metrics.
type reporter struct {
pod string
container string
container string
uniqueName string
}

// NewStatsReporter creates a reporter that collects and reports filter metrics.
func NewStatsReporter(pod, container string) StatsReporter {
func NewStatsReporter(container, uniqueName string) StatsReporter {
return &reporter{
pod: pod,
container: container,
container: container,
uniqueName: uniqueName,
}
}

Expand All @@ -115,19 +115,19 @@ func register() {
Description: eventCountM.Description(),
Measure: eventCountM,
Aggregation: view.Count(),
TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.PodTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.PodTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: processingTimeInMsecM.Description(),
Measure: processingTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, broker.PodTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{namespaceKey, triggerKey, brokerKey, triggerFilterTypeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
)
if err != nil {
Expand Down Expand Up @@ -176,8 +176,8 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C
// Note that filterType and filterSource can be empty strings, so they need a special treatment.
ctx, err := tag.New(
emptyContext,
tag.Insert(broker.PodTagKey, r.pod),
tag.Insert(broker.ContainerTagKey, r.container),
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(namespaceKey, args.ns),
tag.Insert(triggerKey, args.trigger),
tag.Insert(brokerKey, args.broker),
Expand Down
8 changes: 4 additions & 4 deletions pkg/broker/filter/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func TestStatsReporter(t *testing.T) {
filterType: "testeventtype",
}

r := NewStatsReporter("testpod", "testcontainer")
r := NewStatsReporter("testcontainer", "testpod")

wantTags := map[string]string{
metricskey.LabelNamespaceName: "testns",
metricskey.LabelTriggerName: "testtrigger",
metricskey.LabelBrokerName: "testbroker",
metricskey.LabelFilterType: "testeventtype",
broker.LabelContainerName: "testcontainer",
broker.LabelPodName: "testpod",
broker.LabelUniqueName: "testpod",
}

wantAllTags := map[string]string{}
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
filterType: "",
}

r := NewStatsReporter("testpod", "testcontainer")
r := NewStatsReporter("testcontainer", "testpod")

wantTags := map[string]string{
metricskey.LabelNamespaceName: "testns",
Expand All @@ -101,7 +101,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
metricskey.LabelResponseCode: "202",
metricskey.LabelResponseCodeClass: "2xx",
broker.LabelContainerName: "testcontainer",
broker.LabelPodName: "testpod",
broker.LabelUniqueName: "testpod",
}

// test ReportEventCount
Expand Down
16 changes: 8 additions & 8 deletions pkg/broker/ingress/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ var emptyContext = context.Background()

// Reporter holds cached metric objects to report ingress metrics.
type reporter struct {
pod string
container string
container string
uniqueName string
}

// NewStatsReporter creates a reporter that collects and reports ingress metrics.
func NewStatsReporter(pod, container string) StatsReporter {
func NewStatsReporter(container, uniqueName string) StatsReporter {
return &reporter{
pod: pod,
container: container,
container: container,
uniqueName: uniqueName,
}
}

Expand All @@ -99,8 +99,8 @@ func register() {
eventTypeKey,
responseCodeKey,
responseCodeClassKey,
broker.PodTagKey,
broker.ContainerTagKey}
broker.ContainerTagKey,
broker.UniqueTagKey}

// Create view to see our measurements.
err := view.Register(
Expand Down Expand Up @@ -146,8 +146,8 @@ func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d
func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Context, error) {
return tag.New(
emptyContext,
tag.Insert(broker.PodTagKey, r.pod),
tag.Insert(broker.ContainerTagKey, r.container),
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(namespaceKey, args.ns),
tag.Insert(brokerKey, args.broker),
tag.Insert(eventTypeKey, args.eventType),
Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/ingress/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func TestStatsReporter(t *testing.T) {
eventType: "testeventtype",
}

r := NewStatsReporter("testpod", "testcontainer")
r := NewStatsReporter("testcontainer", "testpod")

wantTags := map[string]string{
metricskey.LabelNamespaceName: "testns",
metricskey.LabelBrokerName: "testbroker",
metricskey.LabelEventType: "testeventtype",
metricskey.LabelResponseCode: "202",
metricskey.LabelResponseCodeClass: "2xx",
broker.LabelPodName: "testpod",
broker.LabelUniqueName: "testpod",
broker.LabelContainerName: "testcontainer",
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/broker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ const (
// The format is an RFC3339 time in string format. For example: 2019-08-26T23:38:17.834384404Z.
EventArrivalTime = "knativearrivaltime"

// LabelUniqueName is the label for the unique name per stats_reporter instance.
LabelUniqueName = "unique_name"

// LabelContainerName is the label for the immutable name of the container.
LabelContainerName = "container_name"

// LabelPodName is the label for the immutable name of the pod.
LabelPodName = "pod_name"
)

var (
PodTagKey = tag.MustNewKey(LabelPodName)
ContainerTagKey = tag.MustNewKey(LabelContainerName)
UniqueTagKey = tag.MustNewKey(LabelUniqueName)
)