diff --git a/Gopkg.lock b/Gopkg.lock index 0fd66fadcbc..66f06646221 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -424,6 +424,14 @@ pruneopts = "NUT" revision = "f2b4162afba35581b6d4a50d3b8f34e33c144682" +[[projects]] + digest = "1:b8870bf2606dca65dc382f4cb8b7a434f17ff36a915451bda12788e9620be368" + name = "github.com/kelseyhightower/envconfig" + packages = ["."] + pruneopts = "NUT" + revision = "f611eb38b3875cc3bd991ca91c51d06446afa14c" + version = "v1.3.0" + [[projects]] digest = "1:57d04562d05dd4500ff1e7e47f2e62b9be0531388377a3b691a012ce70b210d5" name = "github.com/knative/pkg" @@ -1375,6 +1383,7 @@ "github.com/google/go-cmp/cmp", "github.com/google/go-cmp/cmp/cmpopts", "github.com/google/uuid", + "github.com/kelseyhightower/envconfig", "github.com/knative/pkg/apis", "github.com/knative/pkg/apis/duck", "github.com/knative/pkg/apis/duck/v1alpha1", @@ -1432,6 +1441,7 @@ "k8s.io/apimachinery/pkg/util/sets", "k8s.io/apimachinery/pkg/util/sets/types", "k8s.io/apimachinery/pkg/util/uuid", + "k8s.io/apimachinery/pkg/util/validation", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/util/yaml", "k8s.io/apimachinery/pkg/watch", diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 672516b69f4..726d75dda16 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -24,12 +24,12 @@ import ( "log" "net/http" "net/url" - "os" "reflect" "sync" "time" - "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go" + "github.com/kelseyhightower/envconfig" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/broker" "github.com/knative/eventing/pkg/provisioners" @@ -58,6 +58,20 @@ var ( wg sync.WaitGroup ) +type envConfig struct { + // Channel where to send the cloudevents. + Channel string `envconfig:"CHANNEL"` + + // Broker name for this ingress. + Broker string `envconfig:"BROKER" required:"true"` + + // Namespace of this ingress. + Namespace string `envconfig:"NAMESPACE" required:"true"` + + // To indicate whether the ingress should allow any event. + AllowAny bool `envconfig:"ALLOW_ANY" required:"true"` +} + func main() { logConfig := provisioners.NewLoggingConfig() logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar() @@ -65,9 +79,18 @@ func main() { flag.Parse() crlog.SetLogger(crlog.ZapLogger(false)) + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatal("Failed to process env var", zap.Error(err)) + } + logger.Info("Starting...") - mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) + namespace := env.Namespace + + mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{ + Namespace: namespace, + }) if err != nil { logger.Fatal("Error starting up.", zap.Error(err)) } @@ -76,23 +99,32 @@ func main() { logger.Fatal("Unable to add eventingv1alpha1 scheme", zap.Error(err)) } - brokerName := getRequiredEnv("BROKER") + brokerName := env.Broker channelURI := &url.URL{ Scheme: "http", - Host: getRequiredEnv("CHANNEL"), + Host: env.Channel, Path: "/", } + client := mgr.GetClient() + + policySpec := &eventingv1alpha1.IngressPolicySpec{ + AllowAny: env.AllowAny, + } + + ingressPolicy := broker.NewPolicy(logger, client, policySpec, namespace, brokerName, true) + ceClient, err := cloudevents.NewDefaultClient() if err != nil { logger.Fatal("Unable to create CE client", zap.Error(err)) } h := &handler{ - logger: logger, - ceClient: ceClient, - channelURI: channelURI, - brokerName: brokerName, + logger: logger, + ceClient: ceClient, + channelURI: channelURI, + brokerName: brokerName, + ingressPolicy: ingressPolicy, } // Run the event handler with the manager. @@ -146,19 +178,12 @@ func main() { logger.Info("Done.") } -func getRequiredEnv(envKey string) string { - val, defined := os.LookupEnv(envKey) - if !defined { - log.Fatalf("required environment variable not defined '%s'", envKey) - } - return val -} - type handler struct { - logger *zap.Logger - ceClient cloudevents.Client - channelURI *url.URL - brokerName string + logger *zap.Logger + ceClient cloudevents.Client + channelURI *url.URL + brokerName string + ingressPolicy *broker.IngressPolicy } func (h *handler) Start(stopCh <-chan struct{}) error { @@ -213,10 +238,15 @@ func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp * return nil } - // TODO Filter. + if h.allowEvent(ctx, event) { + ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched")) + return h.sendEvent(ctx, tctx, event) + } + return nil +} - ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched")) - return h.sendEvent(ctx, tctx, event) +func (h *handler) allowEvent(ctx context.Context, event cloudevents.Event) bool { + return h.ingressPolicy.AllowEvent(ctx, event) } func (h *handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportContext, event cloudevents.Event) error { diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index b746e9ca24a..429ca099c03 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -101,6 +101,7 @@ func main() { eventingv1alpha1.SchemeGroupVersion.WithKind("ClusterChannelProvisioner"): &eventingv1alpha1.ClusterChannelProvisioner{}, eventingv1alpha1.SchemeGroupVersion.WithKind("Subscription"): &eventingv1alpha1.Subscription{}, eventingv1alpha1.SchemeGroupVersion.WithKind("Trigger"): &eventingv1alpha1.Trigger{}, + eventingv1alpha1.SchemeGroupVersion.WithKind("EventType"): &eventingv1alpha1.EventType{}, }, Logger: logger, } diff --git a/config/200-broker-clusterrole.yaml b/config/200-broker-clusterrole.yaml index fc430c0dc8f..02dc2f7ed63 100644 --- a/config/200-broker-clusterrole.yaml +++ b/config/200-broker-clusterrole.yaml @@ -26,3 +26,20 @@ rules: - get - list - watch + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: eventing-broker-ingress +rules: + - apiGroups: + - eventing.knative.dev + resources: + - eventtypes + - eventtypes/status + verbs: + - get + - list + - watch diff --git a/config/300-eventtype.yaml b/config/300-eventtype.yaml new file mode 100644 index 00000000000..1b83fd8cd11 --- /dev/null +++ b/config/300-eventtype.yaml @@ -0,0 +1,53 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: eventtypes.eventing.knative.dev +spec: + group: eventing.knative.dev + version: v1alpha1 + names: + kind: EventType + plural: eventtypes + singular: eventtype + categories: + - all + - knative + - eventing + scope: Namespaced + subresources: + status: {} + additionalPrinterColumns: + - name: Type + type: string + JSONPath: ".spec.type" + - name: Source + type: string + JSONPath: ".spec.source" + - name: Schema + type: string + JSONPath: ".spec.schema" + - name: Broker + type: string + JSONPath: ".spec.broker" + - name: Description + type: string + JSONPath: ".spec.description" + - name: Ready + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason" diff --git a/config/500-controller.yaml b/config/500-controller.yaml index 2cb7d815c06..693e6193f06 100644 --- a/config/500-controller.yaml +++ b/config/500-controller.yaml @@ -45,7 +45,7 @@ spec: - name: BROKER_INGRESS_IMAGE value: github.com/knative/eventing/cmd/broker/ingress - name: BROKER_INGRESS_SERVICE_ACCOUNT - value: default + value: eventing-broker-ingress - name: BROKER_FILTER_IMAGE value: github.com/knative/eventing/cmd/broker/filter - name: BROKER_FILTER_SERVICE_ACCOUNT diff --git a/docs/registry/example_broker_policies.yaml b/docs/registry/example_broker_policies.yaml new file mode 100644 index 00000000000..fedaeeb42b8 --- /dev/null +++ b/docs/registry/example_broker_policies.yaml @@ -0,0 +1,36 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is a list of example Brokers with different policies. Each could be used independently. + +--- + +# By not specifying an IngressPolicy, the default will accept any event. + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Broker +metadata: + name: broker-allow-any + +--- + +# By setting the IngressPolicy to not allowAny, the broker will accept only events in the Registry. + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Broker +metadata: + name: broker-allow-registered +spec: + ingressPolicy: + allowAny: false diff --git a/docs/registry/example_eventtype.yaml b/docs/registry/example_eventtype.yaml new file mode 100644 index 00000000000..844fcf67f1b --- /dev/null +++ b/docs/registry/example_eventtype.yaml @@ -0,0 +1,49 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +# This EventType creates an event type with type 'com.github.pull_request', source +# 'github.com', for the 'broker-allow-registered' Broker. + +apiVersion: eventing.knative.dev/v1alpha1 +kind: EventType +metadata: + name: com.github.pullrequest +spec: + type: com.github.pull_request + source: github.com + broker: broker-allow-registered + description: "BitBucket Repo Push" + +--- + +# This Trigger matches all events of type 'com.github.pull_request' and source +# 'github.com', that are sent to the 'broker-allow-registered' Broker. + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Trigger +metadata: + name: filtering-event-type +spec: + filter: + sourceAndType: + type: com.github.pull_request + source: github.com + broker: broker-allow-registered + subscriber: + ref: + apiVersion: serving.knative.dev/v1alpha1 + kind: Service + name: message-dumper diff --git a/pkg/apis/eventing/v1alpha1/broker_defaults.go b/pkg/apis/eventing/v1alpha1/broker_defaults.go index 6351fea1a2c..388ee3e7aec 100644 --- a/pkg/apis/eventing/v1alpha1/broker_defaults.go +++ b/pkg/apis/eventing/v1alpha1/broker_defaults.go @@ -23,5 +23,10 @@ func (b *Broker) SetDefaults(ctx context.Context) { } func (bs *BrokerSpec) SetDefaults(ctx context.Context) { - // None + if bs.IngressPolicy == nil { + // Setting as default to allow any event. + bs.IngressPolicy = &IngressPolicySpec{ + AllowAny: true, + } + } } diff --git a/pkg/apis/eventing/v1alpha1/broker_defaults_test.go b/pkg/apis/eventing/v1alpha1/broker_defaults_test.go index f8dc1302150..dcee2377dee 100644 --- a/pkg/apis/eventing/v1alpha1/broker_defaults_test.go +++ b/pkg/apis/eventing/v1alpha1/broker_defaults_test.go @@ -19,10 +19,46 @@ package v1alpha1 import ( "context" "testing" + + "github.com/google/go-cmp/cmp" ) -// No-op test because method does nothing. func TestBrokerDefaults(t *testing.T) { - b := Broker{} - b.SetDefaults(context.TODO()) + testCases := map[string]struct { + initial Broker + expected Broker + }{ + "nil ingress": { + initial: Broker{}, + expected: Broker{ + Spec: BrokerSpec{ + IngressPolicy: &IngressPolicySpec{ + AllowAny: true, + }, + }, + }, + }, + "allow any not set": { + initial: Broker{ + Spec: BrokerSpec{ + IngressPolicy: &IngressPolicySpec{}, + }, + }, + expected: Broker{ + Spec: BrokerSpec{ + IngressPolicy: &IngressPolicySpec{ + AllowAny: false, + }, + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + tc.initial.SetDefaults(context.TODO()) + if diff := cmp.Diff(tc.expected, tc.initial); diff != "" { + t.Fatalf("Unexpected defaults (-want, +got): %s", diff) + } + }) + } } diff --git a/pkg/apis/eventing/v1alpha1/broker_types.go b/pkg/apis/eventing/v1alpha1/broker_types.go index c6e6cd09c1b..12c8995aeab 100644 --- a/pkg/apis/eventing/v1alpha1/broker_types.go +++ b/pkg/apis/eventing/v1alpha1/broker_types.go @@ -55,6 +55,15 @@ type BrokerSpec struct { // // +optional ChannelTemplate *ChannelSpec `json:"channelTemplate,omitempty"` + + // IngressPolicy defines the Broker's policy regarding the events it can accept into the mesh or not. + // +optional + IngressPolicy *IngressPolicySpec `json:"ingressPolicy,omitempty"` +} + +type IngressPolicySpec struct { + // AllowAny, if set to true accepts any message into the mesh. If set to false, only allows pre-registered events. + AllowAny bool `json:"allowAny,omitempty"` } // BrokerStatus represents the current state of a Broker. diff --git a/pkg/apis/eventing/v1alpha1/broker_validation.go b/pkg/apis/eventing/v1alpha1/broker_validation.go index 483e1ffab83..72739a7e39d 100644 --- a/pkg/apis/eventing/v1alpha1/broker_validation.go +++ b/pkg/apis/eventing/v1alpha1/broker_validation.go @@ -27,8 +27,13 @@ func (b *Broker) Validate(ctx context.Context) *apis.FieldError { } func (bs *BrokerSpec) Validate(ctx context.Context) *apis.FieldError { + var errs *apis.FieldError + if bs.IngressPolicy == nil { + fe := apis.ErrMissingField("ingressPolicy") + errs = errs.Also(fe) + } // TODO validate that the channelTemplate only specifies the provisioner and arguments. - return nil + return errs } func (b *Broker) CheckImmutableFields(ctx context.Context, og apis.Immutable) *apis.FieldError { @@ -36,5 +41,6 @@ func (b *Broker) CheckImmutableFields(ctx context.Context, og apis.Immutable) *a // changing it will normally not have the desired effect of changing the Channel inside the // Broker. It would have an effect if the existing Channel was then deleted, the newly created // Channel would use the new spec.channelTemplate. + // Similar thing would happen with spec.ingressPolicy. return nil } diff --git a/pkg/apis/eventing/v1alpha1/broker_validation_test.go b/pkg/apis/eventing/v1alpha1/broker_validation_test.go index 9d737bf3173..795b4117cc0 100644 --- a/pkg/apis/eventing/v1alpha1/broker_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/broker_validation_test.go @@ -19,18 +19,51 @@ package v1alpha1 import ( "context" "testing" + + "github.com/google/go-cmp/cmp" + "github.com/knative/pkg/apis" ) -// No-op test because method does nothing. func TestBrokerValidation(t *testing.T) { - b := Broker{} - _ = b.Validate(context.TODO()) + name := "invalid ingress policy spec" + broker := &Broker{Spec: BrokerSpec{}} + + want := &apis.FieldError{ + Paths: []string{"spec.ingressPolicy"}, + Message: "missing field(s)", + } + + t.Run(name, func(t *testing.T) { + got := broker.Validate(context.TODO()) + if diff := cmp.Diff(want.Error(), got.Error()); diff != "" { + t.Errorf("Broker.Validate (-want, +got) = %v", diff) + } + }) } -// No-op test because method does nothing. func TestBrokerSpecValidation(t *testing.T) { - bs := BrokerSpec{} - _ = bs.Validate(context.TODO()) + tests := []struct { + name string + bs *BrokerSpec + want *apis.FieldError + }{{ + name: "invalid broker spec", + bs: &BrokerSpec{}, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("ingressPolicy") + return fe + }(), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.bs.Validate(context.TODO()) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("%s: Validate BrokerSpec (-want, +got) = %v", test.name, diff) + } + }) + } } // No-op test because method does nothing. diff --git a/pkg/apis/eventing/v1alpha1/eventtype_defaults.go b/pkg/apis/eventing/v1alpha1/eventtype_defaults.go new file mode 100644 index 00000000000..eee2cec8c46 --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtype_defaults.go @@ -0,0 +1,29 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import "context" + +func (et *EventType) SetDefaults(ctx context.Context) { + et.Spec.SetDefaults(ctx) +} + +func (ets *EventTypeSpec) SetDefaults(ctx context.Context) { + if ets.Broker == "" { + ets.Broker = "default" + } +} diff --git a/pkg/apis/eventing/v1alpha1/eventtype_defaults_test.go b/pkg/apis/eventing/v1alpha1/eventtype_defaults_test.go new file mode 100644 index 00000000000..6ac447be495 --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtype_defaults_test.go @@ -0,0 +1,83 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestEventTypeDefaults(t *testing.T) { + testCases := map[string]struct { + initial EventType + expected EventType + }{ + "nil spec": { + initial: EventType{}, + expected: EventType{ + Spec: EventTypeSpec{ + Broker: "default", + }, + }, + }, + "broker empty": { + initial: EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Source: "test-source", + Broker: "", + Schema: "test-schema", + }, + }, + expected: EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Source: "test-source", + Broker: "default", + Schema: "test-schema", + }, + }, + }, + "broker not set": { + initial: EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Source: "test-source", + Schema: "test-schema", + }, + }, + expected: EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Source: "test-source", + Broker: "default", + Schema: "test-schema", + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + tc.initial.SetDefaults(context.TODO()) + if diff := cmp.Diff(tc.expected, tc.initial); diff != "" { + t.Fatalf("Unexpected defaults (-want, +got): %s", diff) + } + }) + } +} diff --git a/pkg/apis/eventing/v1alpha1/eventtype_lifecycle.go b/pkg/apis/eventing/v1alpha1/eventtype_lifecycle.go new file mode 100644 index 00000000000..5cef6862a6c --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtype_lifecycle.go @@ -0,0 +1,58 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + +var eventTypeCondSet = duckv1alpha1.NewLivingConditionSet(EventTypeConditionBrokerExists, EventTypeConditionBrokerReady) + +const ( + EventTypeConditionReady = duckv1alpha1.ConditionReady + EventTypeConditionBrokerExists duckv1alpha1.ConditionType = "BrokerExists" + EventTypeConditionBrokerReady duckv1alpha1.ConditionType = "BrokerReady" +) + +// GetCondition returns the condition currently associated with the given type, or nil. +func (et *EventTypeStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition { + return eventTypeCondSet.Manage(et).GetCondition(t) +} + +// IsReady returns true if the resource is ready overall. +func (et *EventTypeStatus) IsReady() bool { + return eventTypeCondSet.Manage(et).IsHappy() +} + +// InitializeConditions sets relevant unset conditions to Unknown state. +func (et *EventTypeStatus) InitializeConditions() { + eventTypeCondSet.Manage(et).InitializeConditions() +} + +func (et *EventTypeStatus) MarkBrokerExists() { + eventTypeCondSet.Manage(et).MarkTrue(EventTypeConditionBrokerExists) +} + +func (et *EventTypeStatus) MarkBrokerDoesNotExist() { + eventTypeCondSet.Manage(et).MarkFalse(EventTypeConditionBrokerExists, "BrokerDoesNotExist", "Broker does not exist") +} + +func (et *EventTypeStatus) MarkBrokerReady() { + eventTypeCondSet.Manage(et).MarkTrue(EventTypeConditionBrokerReady) +} + +func (et *EventTypeStatus) MarkBrokerNotReady() { + eventTypeCondSet.Manage(et).MarkFalse(EventTypeConditionBrokerReady, "BrokerNotReady", "Broker is not ready") +} diff --git a/pkg/apis/eventing/v1alpha1/eventtype_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/eventtype_lifecycle_test.go new file mode 100644 index 00000000000..d88c3fcb308 --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtype_lifecycle_test.go @@ -0,0 +1,247 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" +) + +var ( + trueValue = true + falseValue = false +) + +var ( + eventTypeConditionReady = duckv1alpha1.Condition{ + Type: EventTypeConditionReady, + Status: corev1.ConditionTrue, + } + + eventTypeConditionBrokerExists = duckv1alpha1.Condition{ + Type: EventTypeConditionBrokerExists, + Status: corev1.ConditionTrue, + } + + eventTypeConditionBrokerReady = duckv1alpha1.Condition{ + Type: EventTypeConditionBrokerReady, + Status: corev1.ConditionTrue, + } +) + +func TestEventTypeGetCondition(t *testing.T) { + tests := []struct { + name string + ets *EventTypeStatus + condQuery duckv1alpha1.ConditionType + want *duckv1alpha1.Condition + }{{ + name: "single condition", + ets: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{ + eventTypeConditionReady, + }, + }, + }, + condQuery: duckv1alpha1.ConditionReady, + want: &eventTypeConditionReady, + }, { + name: "broker exists condition", + ets: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{ + eventTypeConditionBrokerExists, + }, + }, + }, + condQuery: EventTypeConditionBrokerExists, + want: &eventTypeConditionBrokerExists, + }, { + name: "multiple conditions, condition true", + ets: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{ + eventTypeConditionBrokerExists, + eventTypeConditionBrokerReady, + }, + }, + }, + condQuery: EventTypeConditionBrokerReady, + want: &eventTypeConditionBrokerReady, + }, { + name: "unknown condition", + ets: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{ + eventTypeConditionBrokerReady, + eventTypeConditionReady, + }, + }, + }, + condQuery: duckv1alpha1.ConditionType("foo"), + want: nil, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.ets.GetCondition(test.condQuery) + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } + }) + } +} + +func TestEventTypeInitializeConditions(t *testing.T) { + tests := []struct { + name string + ets *EventTypeStatus + want *EventTypeStatus + }{{ + name: "empty", + ets: &EventTypeStatus{}, + want: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{{ + Type: EventTypeConditionBrokerExists, + Status: corev1.ConditionUnknown, + }, { + Type: EventTypeConditionBrokerReady, + Status: corev1.ConditionUnknown, + }, { + Type: EventTypeConditionReady, + Status: corev1.ConditionUnknown, + }, + }, + }, + }, + }, { + name: "one false", + ets: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{{ + Type: EventTypeConditionBrokerExists, + Status: corev1.ConditionFalse, + }}, + }, + }, + want: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{{ + Type: EventTypeConditionBrokerExists, + Status: corev1.ConditionFalse, + }, { + Type: EventTypeConditionBrokerReady, + Status: corev1.ConditionUnknown, + }, { + Type: EventTypeConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one true", + ets: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{{ + Type: EventTypeConditionBrokerReady, + Status: corev1.ConditionTrue, + }}, + }, + }, + want: &EventTypeStatus{ + Status: duckv1alpha1.Status{ + Conditions: []duckv1alpha1.Condition{{ + Type: EventTypeConditionBrokerExists, + Status: corev1.ConditionUnknown, + }, { + Type: EventTypeConditionBrokerReady, + Status: corev1.ConditionTrue, + }, { + Type: EventTypeConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + test.ets.InitializeConditions() + if diff := cmp.Diff(test.want, test.ets, ignoreAllButTypeAndStatus); diff != "" { + t.Errorf("unexpected conditions (-want, +got) = %v", diff) + } + }) + } +} + +func TestEventTypeIsReady(t *testing.T) { + tests := []struct { + name string + markBrokerExists *bool + markBrokerReady *bool + wantReady bool + }{{ + name: "all happy", + markBrokerExists: &trueValue, + markBrokerReady: &trueValue, + wantReady: true, + }, { + name: "broker exist sad", + markBrokerExists: &falseValue, + markBrokerReady: &trueValue, + wantReady: false, + }, { + name: "broker ready sad", + markBrokerExists: &trueValue, + markBrokerReady: &falseValue, + wantReady: false, + }, { + name: "all sad", + markBrokerExists: &falseValue, + markBrokerReady: &falseValue, + wantReady: false, + }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ets := &EventTypeStatus{} + if test.markBrokerExists != nil { + if *test.markBrokerExists { + ets.MarkBrokerExists() + } else { + ets.MarkBrokerDoesNotExist() + } + } + if test.markBrokerReady != nil { + if *test.markBrokerReady { + ets.MarkBrokerReady() + } else { + ets.MarkBrokerNotReady() + } + } + + got := ets.IsReady() + if test.wantReady != got { + t.Errorf("unexpected readiness: want %v, got %v", test.wantReady, got) + } + }) + } +} diff --git a/pkg/apis/eventing/v1alpha1/eventtype_types.go b/pkg/apis/eventing/v1alpha1/eventtype_types.go new file mode 100644 index 00000000000..f688c6ebba2 --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtype_types.go @@ -0,0 +1,82 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "github.com/knative/pkg/apis" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/webhook" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type EventType struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Spec defines the desired state of the EventType. + Spec EventTypeSpec `json:"spec,omitempty"` + + // Status represents the current state of the EventType. + // This data may be out of date. + // +optional + Status EventTypeStatus `json:"status,omitempty"` +} + +// Check that EventType can be validated, can be defaulted, and has immutable fields. +var _ apis.Validatable = (*EventType)(nil) +var _ apis.Defaultable = (*EventType)(nil) +var _ apis.Immutable = (*EventType)(nil) +var _ runtime.Object = (*EventType)(nil) +var _ webhook.GenericCRD = (*EventType)(nil) + +type EventTypeSpec struct { + // Type represents the CloudEvents type. It is authoritative. + Type string `json:"type"` + // Source is a valid URI, it represents the CloudEvents source. + Source string `json:"source,omitempty"` + // Schema is a valid URI, it represents the CloudEvents schema attribute. + // It may be a JSON schema, a protobuf schema, etc. It is optional. + // +optional + Schema string `json:"schema,omitempty"` + // Broker refers to the Broker that can provide the EventType. + Broker string `json:"broker"` + // Description is an optional field used to describe the EventType, in any meaningful way. + Description string `json:description,omitempty` +} + +// EventTypeStatus represents the current state of a EventType. +type EventTypeStatus struct { + // inherits duck/v1alpha1 Status, which currently provides: + // * ObservedGeneration - the 'Generation' of the Service that was last processed by the controller. + // * Conditions - the latest available observations of a resource's current state. + duckv1alpha1.Status `json:",inline"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// EventTypeList is a collection of EventTypes. +type EventTypeList struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ListMeta `json:"metadata,omitempty"` + Items []EventType `json:"items"` +} diff --git a/pkg/apis/eventing/v1alpha1/eventtype_validation.go b/pkg/apis/eventing/v1alpha1/eventtype_validation.go new file mode 100644 index 00000000000..279e586cadc --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtype_validation.go @@ -0,0 +1,69 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + + "github.com/knative/pkg/apis" + "github.com/knative/pkg/kmp" +) + +func (et *EventType) Validate(ctx context.Context) *apis.FieldError { + return et.Spec.Validate(ctx).ViaField("spec") +} + +func (ets *EventTypeSpec) Validate(ctx context.Context) *apis.FieldError { + var errs *apis.FieldError + if ets.Type == "" { + fe := apis.ErrMissingField("type") + errs = errs.Also(fe) + } + if ets.Broker == "" { + fe := apis.ErrMissingField("broker") + errs = errs.Also(fe) + } + return errs +} + +func (et *EventType) CheckImmutableFields(ctx context.Context, og apis.Immutable) *apis.FieldError { + if og == nil { + return nil + } + + original, ok := og.(*EventType) + if !ok { + return &apis.FieldError{Message: "The provided original was not an EventType"} + } + + // All fields immutable. + if diff, err := kmp.ShortDiff(original.Spec, et.Spec); err != nil { + return &apis.FieldError{ + Message: "Failed to diff EventType", + Paths: []string{"spec"}, + Details: err.Error(), + } + } else if diff != "" { + return &apis.FieldError{ + Message: "Immutable fields changed (-old +new)", + Paths: []string{"spec"}, + Details: diff, + } + } + + return nil +} diff --git a/pkg/apis/eventing/v1alpha1/eventtype_validation_test.go b/pkg/apis/eventing/v1alpha1/eventtype_validation_test.go new file mode 100644 index 00000000000..eaaa897ef14 --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtype_validation_test.go @@ -0,0 +1,238 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/knative/pkg/apis" +) + +func TestEventTypeValidation(t *testing.T) { + name := "invalid type and broker" + broker := &EventType{Spec: EventTypeSpec{}} + + want := &apis.FieldError{ + Paths: []string{"spec.type", "spec.broker"}, + Message: "missing field(s)", + } + + t.Run(name, func(t *testing.T) { + got := broker.Validate(context.TODO()) + if diff := cmp.Diff(want.Error(), got.Error()); diff != "" { + t.Errorf("EventType.Validate (-want, +got) = %v", diff) + } + }) +} + +func TestEventTypeSpecValidation(t *testing.T) { + tests := []struct { + name string + ets *EventTypeSpec + want *apis.FieldError + }{{ + name: "invalid eventtype spec", + ets: &EventTypeSpec{}, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("type", "broker") + return fe + }(), + }, { + name: "invalid eventtype type", + ets: &EventTypeSpec{ + Broker: "test-broker", + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("type") + return fe + }(), + }, { + name: "invalid eventtype broker", + ets: &EventTypeSpec{ + Type: "test-type", + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("broker") + return fe + }(), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.ets.Validate(context.TODO()) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("%s: Validate EventTypeSpec (-want, +got) = %v", test.name, diff) + } + }) + } +} + +func TestEventTypeImmutableFields(t *testing.T) { + tests := []struct { + name string + current apis.Immutable + original apis.Immutable + want *apis.FieldError + }{{ + name: "good (no change)", + current: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Source: "test-source", + Broker: "test-broker", + Schema: "test-schema", + }, + }, + original: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Source: "test-source", + Broker: "test-broker", + Schema: "test-schema", + }, + }, + want: nil, + }, { + name: "new nil is ok", + current: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Source: "test-source", + Broker: "test-broker", + Schema: "test-schema", + }, + }, + original: nil, + want: nil, + }, { + name: "invalid type", + current: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Broker: "test-broker", + }, + }, + original: &Trigger{}, + want: &apis.FieldError{ + Message: "The provided original was not an EventType", + }, + }, { + name: "bad (broker change)", + current: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Broker: "test-broker", + }, + }, + original: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Broker: "original-broker", + }, + }, + want: &apis.FieldError{ + Message: "Immutable fields changed (-old +new)", + Paths: []string{"spec"}, + Details: `{v1alpha1.EventTypeSpec}.Broker: + -: "original-broker" + +: "test-broker" +`, + }, + }, { + name: "bad (type change)", + current: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Broker: "test-broker", + }, + }, + original: &EventType{ + Spec: EventTypeSpec{ + Type: "original-type", + Broker: "test-broker", + }, + }, + want: &apis.FieldError{ + Message: "Immutable fields changed (-old +new)", + Paths: []string{"spec"}, + Details: `{v1alpha1.EventTypeSpec}.Type: + -: "original-type" + +: "test-type" +`, + }, + }, { + name: "bad (schema change)", + current: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Broker: "test-broker", + Schema: "test-schema", + }, + }, + original: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Broker: "test-broker", + Schema: "original-schema", + }, + }, + want: &apis.FieldError{ + Message: "Immutable fields changed (-old +new)", + Paths: []string{"spec"}, + Details: `{v1alpha1.EventTypeSpec}.Schema: + -: "original-schema" + +: "test-schema" +`, + }, + }, { + name: "bad (source change)", + current: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Broker: "test-broker", + Source: "test-source", + }, + }, + original: &EventType{ + Spec: EventTypeSpec{ + Type: "test-type", + Broker: "test-broker", + Source: "original-source", + }, + }, + want: &apis.FieldError{ + Message: "Immutable fields changed (-old +new)", + Paths: []string{"spec"}, + Details: `{v1alpha1.EventTypeSpec}.Source: + -: "original-source" + +: "test-source" +`, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.current.CheckImmutableFields(context.TODO(), test.original) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("CheckImmutableFields (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/apis/eventing/v1alpha1/register.go b/pkg/apis/eventing/v1alpha1/register.go index fb3a5292623..977cad56aaf 100644 --- a/pkg/apis/eventing/v1alpha1/register.go +++ b/pkg/apis/eventing/v1alpha1/register.go @@ -55,6 +55,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &SubscriptionList{}, &Trigger{}, &TriggerList{}, + &EventType{}, + &EventTypeList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 9807e6caf2a..c796ceeaf6a 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -96,6 +96,11 @@ func (in *BrokerSpec) DeepCopyInto(out *BrokerSpec) { *out = new(ChannelSpec) (*in).DeepCopyInto(*out) } + if in.IngressPolicy != nil { + in, out := &in.IngressPolicy, &out.IngressPolicy + *out = new(IngressPolicySpec) + **out = **in + } return } @@ -342,6 +347,116 @@ func (in *ClusterChannelProvisionerStatus) DeepCopy() *ClusterChannelProvisioner return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventType) DeepCopyInto(out *EventType) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventType. +func (in *EventType) DeepCopy() *EventType { + if in == nil { + return nil + } + out := new(EventType) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *EventType) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventTypeList) DeepCopyInto(out *EventTypeList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]EventType, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventTypeList. +func (in *EventTypeList) DeepCopy() *EventTypeList { + if in == nil { + return nil + } + out := new(EventTypeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *EventTypeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventTypeSpec) DeepCopyInto(out *EventTypeSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventTypeSpec. +func (in *EventTypeSpec) DeepCopy() *EventTypeSpec { + if in == nil { + return nil + } + out := new(EventTypeSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventTypeStatus) DeepCopyInto(out *EventTypeStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventTypeStatus. +func (in *EventTypeStatus) DeepCopy() *EventTypeStatus { + if in == nil { + return nil + } + out := new(EventTypeStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IngressPolicySpec) DeepCopyInto(out *IngressPolicySpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IngressPolicySpec. +func (in *IngressPolicySpec) DeepCopy() *IngressPolicySpec { + if in == nil { + return nil + } + out := new(IngressPolicySpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplyStrategy) DeepCopyInto(out *ReplyStrategy) { *out = *in diff --git a/pkg/broker/context.go b/pkg/broker/context.go index a06be9244d4..fd336f5123e 100644 --- a/pkg/broker/context.go +++ b/pkg/broker/context.go @@ -22,7 +22,7 @@ import ( "net/url" "strings" - "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go" "k8s.io/apimachinery/pkg/util/sets" ) diff --git a/pkg/broker/ingress.go b/pkg/broker/ingress.go new file mode 100644 index 00000000000..4c8a2bdb655 --- /dev/null +++ b/pkg/broker/ingress.go @@ -0,0 +1,119 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package broker + +import ( + "context" + "strings" + + "go.uber.org/zap" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + // EventType not found error. + notFound = k8serrors.NewNotFound(eventingv1alpha1.Resource("eventtype"), "") +) + +// IngressPolicy parses Cloud Events, determines if they pass the Broker's policy, and sends them downstream. +type IngressPolicy struct { + logger *zap.SugaredLogger + client client.Client + namespace string + broker string + spec *eventingv1alpha1.IngressPolicySpec + // This bool flag is for UT purposes only. + async bool +} + +// NewPolicy creates an IngressPolicy for a particular Broker. +func NewPolicy(logger *zap.Logger, client client.Client, spec *eventingv1alpha1.IngressPolicySpec, namespace, broker string, async bool) *IngressPolicy { + return &IngressPolicy{ + logger: logger.Sugar(), + client: client, + namespace: namespace, + broker: broker, + spec: spec, + async: async, + } +} + +// AllowEvent filters events based on the configured policy. +func (p *IngressPolicy) AllowEvent(ctx context.Context, event cloudevents.Event) bool { + // 1. If allowAny is set to true, then all events are allowed to enter the mesh. + // 2. If allowAny is set to false, then the event is only accepted if it's already in the Broker's registry. + if p.spec.AllowAny { + p.logger.Debugf("EventType %q received, Accept", event.Type()) + return true + } + return p.isRegistered(ctx, event) +} + +// isRegistered returns whether the EventType corresponding to the CloudEvent is available in the Registry. +func (p *IngressPolicy) isRegistered(ctx context.Context, event cloudevents.Event) bool { + _, err := p.getEventType(ctx, event) + if k8serrors.IsNotFound(err) { + p.logger.Debugf("EventType %q not found, Reject", event.Type()) + return false + } else if err != nil { + p.logger.Errorf("Error retrieving EventType %q, Reject: %v", event.Type(), err) + return false + } + p.logger.Debugf("EventType %q is registered, Accept", event.Type()) + return true +} + +// getEventType retrieves the EventType from the Registry for the given cloudevents.Event. +// If it is not found, it returns an error. +func (p *IngressPolicy) getEventType(ctx context.Context, event cloudevents.Event) (*eventingv1alpha1.EventType, error) { + opts := &client.ListOptions{ + Namespace: p.namespace, + // Set Raw because if we need to get more than one page, then we will put the continue token + // into opts.Raw.Continue. + // TODO filter by Broker label. + Raw: &metav1.ListOptions{}, + } + + for { + etl := &eventingv1alpha1.EventTypeList{} + err := p.client.List(ctx, opts, etl) + if err != nil { + return nil, err + } + for _, et := range etl.Items { + if et.Spec.Broker == p.broker { + // Matching on type, source, and schemaURL. + // Note that if we the CloudEvent comes with a very specific source (i.e., without the split of + // source and subject proposed in v0.3), the EventType most probably won't be there. + if strings.EqualFold(et.Spec.Type, event.Type()) && strings.EqualFold(et.Spec.Source, event.Source()) && strings.EqualFold(et.Spec.Schema, event.SchemaURL()) { + return &et, nil + } + } + } + if etl.Continue != "" { + opts.Raw.Continue = etl.Continue + } else { + return nil, notFound + } + } +} diff --git a/pkg/broker/ingress_test.go b/pkg/broker/ingress_test.go new file mode 100644 index 00000000000..9dabb5585fc --- /dev/null +++ b/pkg/broker/ingress_test.go @@ -0,0 +1,159 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package broker + +import ( + "context" + "errors" + "fmt" + "net/url" + "testing" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "go.uber.org/zap" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +const ( + broker = "test-broker" + namespace = "test-namespace" + testType = "test-type" + otherType = "other-test-type" + testSource = "/test-source" +) + +func init() { + // Add types to scheme. + _ = eventingv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestIngress(t *testing.T) { + + testCases := map[string]struct { + eventTypes []*eventingv1alpha1.EventType + mocks controllertesting.Mocks + event cloudevents.Event + policySpec *eventingv1alpha1.IngressPolicySpec + want bool + // TODO add wantPresent to check creation of an EventType. + }{ + "allow any, accept": { + policySpec: &eventingv1alpha1.IngressPolicySpec{ + AllowAny: true, + }, + event: makeCloudEvent(nil), + want: true, + }, + "allow registered, error listing types, reject": { + policySpec: &eventingv1alpha1.IngressPolicySpec{ + AllowAny: false, + }, + event: makeCloudEvent(nil), + mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, _ *client.ListOptions, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New("error listing types") + }, + }, + }, + want: false, + }, + "allow registered, event not found, reject": { + policySpec: &eventingv1alpha1.IngressPolicySpec{ + AllowAny: false, + }, + event: makeCloudEvent(nil), + eventTypes: []*eventingv1alpha1.EventType{ + makeEventType(otherType, testSource), + }, + want: false, + }, + "allow registered, event registered, accept": { + policySpec: &eventingv1alpha1.IngressPolicySpec{ + AllowAny: false, + }, + event: makeCloudEvent(nil), + eventTypes: []*eventingv1alpha1.EventType{ + makeEventType(testType, testSource), + }, + want: true, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + ctx := context.TODO() + objs := make([]runtime.Object, 0, len(tc.eventTypes)) + for _, et := range tc.eventTypes { + objs = append(objs, et) + } + + c := newClient(objs, tc.mocks) + policy := NewPolicy(zap.NewNop(), c, tc.policySpec, namespace, broker, false) + + got := policy.AllowEvent(ctx, tc.event) + + if tc.want != got { + t.Errorf("want %t, got %t", tc.want, got) + } + + }) + } +} + +func newClient(initial []runtime.Object, mocks controllertesting.Mocks) *controllertesting.MockClient { + innerClient := fake.NewFakeClient(initial...) + return controllertesting.NewMockClient(innerClient, mocks) +} + +func makeCloudEvent(extensions map[string]interface{}) cloudevents.Event { + return cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: testType, + Source: types.URLRef{ + URL: url.URL{ + Path: testSource, + }, + }, + ContentType: cloudevents.StringOfApplicationJSON(), + Extensions: extensions, + }.AsV02(), + } +} + +func makeEventType(eventType, eventSource string) *eventingv1alpha1.EventType { + return &eventingv1alpha1.EventType{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: fmt.Sprintf("%s-", eventType), + Namespace: namespace, + }, + Spec: eventingv1alpha1.EventTypeSpec{ + Type: eventType, + Broker: broker, + Source: eventSource, + }, + } +} diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index 675af6a3ca9..86b5c4381ef 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -23,7 +23,7 @@ import ( "net/url" "time" - "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/reconciler/trigger/path" "go.uber.org/zap" @@ -214,8 +214,7 @@ func (r *Receiver) shouldSendMessage(ts *eventingv1alpha1.TriggerSpec, event *cl return false } filterSource := ts.Filter.SourceAndType.Source - s := event.Context.AsV01().Source - actualSource := s.String() + actualSource := event.Source() if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource { r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource)) return false diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index ed14a6c0041..e299178f969 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -26,7 +26,7 @@ import ( "strings" "testing" - "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "github.com/google/go-cmp/cmp" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" diff --git a/pkg/broker/ttl.go b/pkg/broker/ttl.go index 8d7ccdef39f..d71f425b8a7 100644 --- a/pkg/broker/ttl.go +++ b/pkg/broker/ttl.go @@ -17,7 +17,7 @@ package broker import ( - "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go" ) const ( diff --git a/pkg/client/clientset/versioned/typed/eventing/v1alpha1/eventing_client.go b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/eventing_client.go index 68722de4481..4af33411504 100644 --- a/pkg/client/clientset/versioned/typed/eventing/v1alpha1/eventing_client.go +++ b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/eventing_client.go @@ -30,6 +30,7 @@ type EventingV1alpha1Interface interface { BrokersGetter ChannelsGetter ClusterChannelProvisionersGetter + EventTypesGetter SubscriptionsGetter TriggersGetter } @@ -51,6 +52,10 @@ func (c *EventingV1alpha1Client) ClusterChannelProvisioners() ClusterChannelProv return newClusterChannelProvisioners(c) } +func (c *EventingV1alpha1Client) EventTypes(namespace string) EventTypeInterface { + return newEventTypes(c, namespace) +} + func (c *EventingV1alpha1Client) Subscriptions(namespace string) SubscriptionInterface { return newSubscriptions(c, namespace) } diff --git a/pkg/client/clientset/versioned/typed/eventing/v1alpha1/eventtype.go b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/eventtype.go new file mode 100644 index 00000000000..ff71fb35749 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/eventtype.go @@ -0,0 +1,174 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + scheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// EventTypesGetter has a method to return a EventTypeInterface. +// A group's client should implement this interface. +type EventTypesGetter interface { + EventTypes(namespace string) EventTypeInterface +} + +// EventTypeInterface has methods to work with EventType resources. +type EventTypeInterface interface { + Create(*v1alpha1.EventType) (*v1alpha1.EventType, error) + Update(*v1alpha1.EventType) (*v1alpha1.EventType, error) + UpdateStatus(*v1alpha1.EventType) (*v1alpha1.EventType, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.EventType, error) + List(opts v1.ListOptions) (*v1alpha1.EventTypeList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.EventType, err error) + EventTypeExpansion +} + +// eventTypes implements EventTypeInterface +type eventTypes struct { + client rest.Interface + ns string +} + +// newEventTypes returns a EventTypes +func newEventTypes(c *EventingV1alpha1Client, namespace string) *eventTypes { + return &eventTypes{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the eventType, and returns the corresponding eventType object, and an error if there is any. +func (c *eventTypes) Get(name string, options v1.GetOptions) (result *v1alpha1.EventType, err error) { + result = &v1alpha1.EventType{} + err = c.client.Get(). + Namespace(c.ns). + Resource("eventtypes"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of EventTypes that match those selectors. +func (c *eventTypes) List(opts v1.ListOptions) (result *v1alpha1.EventTypeList, err error) { + result = &v1alpha1.EventTypeList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("eventtypes"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested eventTypes. +func (c *eventTypes) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("eventtypes"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a eventType and creates it. Returns the server's representation of the eventType, and an error, if there is any. +func (c *eventTypes) Create(eventType *v1alpha1.EventType) (result *v1alpha1.EventType, err error) { + result = &v1alpha1.EventType{} + err = c.client.Post(). + Namespace(c.ns). + Resource("eventtypes"). + Body(eventType). + Do(). + Into(result) + return +} + +// Update takes the representation of a eventType and updates it. Returns the server's representation of the eventType, and an error, if there is any. +func (c *eventTypes) Update(eventType *v1alpha1.EventType) (result *v1alpha1.EventType, err error) { + result = &v1alpha1.EventType{} + err = c.client.Put(). + Namespace(c.ns). + Resource("eventtypes"). + Name(eventType.Name). + Body(eventType). + Do(). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *eventTypes) UpdateStatus(eventType *v1alpha1.EventType) (result *v1alpha1.EventType, err error) { + result = &v1alpha1.EventType{} + err = c.client.Put(). + Namespace(c.ns). + Resource("eventtypes"). + Name(eventType.Name). + SubResource("status"). + Body(eventType). + Do(). + Into(result) + return +} + +// Delete takes name of the eventType and deletes it. Returns an error if one occurs. +func (c *eventTypes) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("eventtypes"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *eventTypes) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("eventtypes"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched eventType. +func (c *eventTypes) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.EventType, err error) { + result = &v1alpha1.EventType{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("eventtypes"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/client/clientset/versioned/typed/eventing/v1alpha1/fake/fake_eventing_client.go b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/fake/fake_eventing_client.go index 52759cda929..75da888a5e6 100644 --- a/pkg/client/clientset/versioned/typed/eventing/v1alpha1/fake/fake_eventing_client.go +++ b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/fake/fake_eventing_client.go @@ -40,6 +40,10 @@ func (c *FakeEventingV1alpha1) ClusterChannelProvisioners() v1alpha1.ClusterChan return &FakeClusterChannelProvisioners{c} } +func (c *FakeEventingV1alpha1) EventTypes(namespace string) v1alpha1.EventTypeInterface { + return &FakeEventTypes{c, namespace} +} + func (c *FakeEventingV1alpha1) Subscriptions(namespace string) v1alpha1.SubscriptionInterface { return &FakeSubscriptions{c, namespace} } diff --git a/pkg/client/clientset/versioned/typed/eventing/v1alpha1/fake/fake_eventtype.go b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/fake/fake_eventtype.go new file mode 100644 index 00000000000..4958426561e --- /dev/null +++ b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/fake/fake_eventtype.go @@ -0,0 +1,140 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeEventTypes implements EventTypeInterface +type FakeEventTypes struct { + Fake *FakeEventingV1alpha1 + ns string +} + +var eventtypesResource = schema.GroupVersionResource{Group: "eventing.knative.dev", Version: "v1alpha1", Resource: "eventtypes"} + +var eventtypesKind = schema.GroupVersionKind{Group: "eventing.knative.dev", Version: "v1alpha1", Kind: "EventType"} + +// Get takes name of the eventType, and returns the corresponding eventType object, and an error if there is any. +func (c *FakeEventTypes) Get(name string, options v1.GetOptions) (result *v1alpha1.EventType, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(eventtypesResource, c.ns, name), &v1alpha1.EventType{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.EventType), err +} + +// List takes label and field selectors, and returns the list of EventTypes that match those selectors. +func (c *FakeEventTypes) List(opts v1.ListOptions) (result *v1alpha1.EventTypeList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(eventtypesResource, eventtypesKind, c.ns, opts), &v1alpha1.EventTypeList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.EventTypeList{ListMeta: obj.(*v1alpha1.EventTypeList).ListMeta} + for _, item := range obj.(*v1alpha1.EventTypeList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested eventTypes. +func (c *FakeEventTypes) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(eventtypesResource, c.ns, opts)) + +} + +// Create takes the representation of a eventType and creates it. Returns the server's representation of the eventType, and an error, if there is any. +func (c *FakeEventTypes) Create(eventType *v1alpha1.EventType) (result *v1alpha1.EventType, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(eventtypesResource, c.ns, eventType), &v1alpha1.EventType{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.EventType), err +} + +// Update takes the representation of a eventType and updates it. Returns the server's representation of the eventType, and an error, if there is any. +func (c *FakeEventTypes) Update(eventType *v1alpha1.EventType) (result *v1alpha1.EventType, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(eventtypesResource, c.ns, eventType), &v1alpha1.EventType{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.EventType), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeEventTypes) UpdateStatus(eventType *v1alpha1.EventType) (*v1alpha1.EventType, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(eventtypesResource, "status", c.ns, eventType), &v1alpha1.EventType{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.EventType), err +} + +// Delete takes name of the eventType and deletes it. Returns an error if one occurs. +func (c *FakeEventTypes) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(eventtypesResource, c.ns, name), &v1alpha1.EventType{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeEventTypes) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(eventtypesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.EventTypeList{}) + return err +} + +// Patch applies the patch and returns the patched eventType. +func (c *FakeEventTypes) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.EventType, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(eventtypesResource, c.ns, name, data, subresources...), &v1alpha1.EventType{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.EventType), err +} diff --git a/pkg/client/clientset/versioned/typed/eventing/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/generated_expansion.go index 00ba65313fd..c33c3d484f4 100644 --- a/pkg/client/clientset/versioned/typed/eventing/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/eventing/v1alpha1/generated_expansion.go @@ -24,6 +24,8 @@ type ChannelExpansion interface{} type ClusterChannelProvisionerExpansion interface{} +type EventTypeExpansion interface{} + type SubscriptionExpansion interface{} type TriggerExpansion interface{} diff --git a/pkg/client/informers/externalversions/eventing/v1alpha1/eventtype.go b/pkg/client/informers/externalversions/eventing/v1alpha1/eventtype.go new file mode 100644 index 00000000000..52b41ce918e --- /dev/null +++ b/pkg/client/informers/externalversions/eventing/v1alpha1/eventtype.go @@ -0,0 +1,89 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + versioned "github.com/knative/eventing/pkg/client/clientset/versioned" + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/knative/eventing/pkg/client/listers/eventing/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// EventTypeInformer provides access to a shared informer and lister for +// EventTypes. +type EventTypeInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.EventTypeLister +} + +type eventTypeInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewEventTypeInformer constructs a new informer for EventType type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewEventTypeInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredEventTypeInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredEventTypeInformer constructs a new informer for EventType type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredEventTypeInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.EventingV1alpha1().EventTypes(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.EventingV1alpha1().EventTypes(namespace).Watch(options) + }, + }, + &eventingv1alpha1.EventType{}, + resyncPeriod, + indexers, + ) +} + +func (f *eventTypeInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredEventTypeInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *eventTypeInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&eventingv1alpha1.EventType{}, f.defaultInformer) +} + +func (f *eventTypeInformer) Lister() v1alpha1.EventTypeLister { + return v1alpha1.NewEventTypeLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/eventing/v1alpha1/interface.go b/pkg/client/informers/externalversions/eventing/v1alpha1/interface.go index 004212a1b29..614c6d2e90a 100644 --- a/pkg/client/informers/externalversions/eventing/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/eventing/v1alpha1/interface.go @@ -30,6 +30,8 @@ type Interface interface { Channels() ChannelInformer // ClusterChannelProvisioners returns a ClusterChannelProvisionerInformer. ClusterChannelProvisioners() ClusterChannelProvisionerInformer + // EventTypes returns a EventTypeInformer. + EventTypes() EventTypeInformer // Subscriptions returns a SubscriptionInformer. Subscriptions() SubscriptionInformer // Triggers returns a TriggerInformer. @@ -62,6 +64,11 @@ func (v *version) ClusterChannelProvisioners() ClusterChannelProvisionerInformer return &clusterChannelProvisionerInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} } +// EventTypes returns a EventTypeInformer. +func (v *version) EventTypes() EventTypeInformer { + return &eventTypeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // Subscriptions returns a SubscriptionInformer. func (v *version) Subscriptions() SubscriptionInformer { return &subscriptionInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index 013b0b1e9fc..1c449d99645 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -60,6 +60,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Eventing().V1alpha1().Channels().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("clusterchannelprovisioners"): return &genericInformer{resource: resource.GroupResource(), informer: f.Eventing().V1alpha1().ClusterChannelProvisioners().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("eventtypes"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Eventing().V1alpha1().EventTypes().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("subscriptions"): return &genericInformer{resource: resource.GroupResource(), informer: f.Eventing().V1alpha1().Subscriptions().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("triggers"): diff --git a/pkg/client/listers/eventing/v1alpha1/eventtype.go b/pkg/client/listers/eventing/v1alpha1/eventtype.go new file mode 100644 index 00000000000..ad549bd981b --- /dev/null +++ b/pkg/client/listers/eventing/v1alpha1/eventtype.go @@ -0,0 +1,94 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// EventTypeLister helps list EventTypes. +type EventTypeLister interface { + // List lists all EventTypes in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.EventType, err error) + // EventTypes returns an object that can list and get EventTypes. + EventTypes(namespace string) EventTypeNamespaceLister + EventTypeListerExpansion +} + +// eventTypeLister implements the EventTypeLister interface. +type eventTypeLister struct { + indexer cache.Indexer +} + +// NewEventTypeLister returns a new EventTypeLister. +func NewEventTypeLister(indexer cache.Indexer) EventTypeLister { + return &eventTypeLister{indexer: indexer} +} + +// List lists all EventTypes in the indexer. +func (s *eventTypeLister) List(selector labels.Selector) (ret []*v1alpha1.EventType, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.EventType)) + }) + return ret, err +} + +// EventTypes returns an object that can list and get EventTypes. +func (s *eventTypeLister) EventTypes(namespace string) EventTypeNamespaceLister { + return eventTypeNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// EventTypeNamespaceLister helps list and get EventTypes. +type EventTypeNamespaceLister interface { + // List lists all EventTypes in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.EventType, err error) + // Get retrieves the EventType from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.EventType, error) + EventTypeNamespaceListerExpansion +} + +// eventTypeNamespaceLister implements the EventTypeNamespaceLister +// interface. +type eventTypeNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all EventTypes in the indexer for a given namespace. +func (s eventTypeNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.EventType, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.EventType)) + }) + return ret, err +} + +// Get retrieves the EventType from the indexer for a given namespace and name. +func (s eventTypeNamespaceLister) Get(name string) (*v1alpha1.EventType, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("eventtype"), name) + } + return obj.(*v1alpha1.EventType), nil +} diff --git a/pkg/client/listers/eventing/v1alpha1/expansion_generated.go b/pkg/client/listers/eventing/v1alpha1/expansion_generated.go index 4bd2b3c78eb..5f34cbf332d 100644 --- a/pkg/client/listers/eventing/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/eventing/v1alpha1/expansion_generated.go @@ -38,6 +38,14 @@ type ChannelNamespaceListerExpansion interface{} // ClusterChannelProvisionerLister. type ClusterChannelProvisionerListerExpansion interface{} +// EventTypeListerExpansion allows custom methods to be added to +// EventTypeLister. +type EventTypeListerExpansion interface{} + +// EventTypeNamespaceListerExpansion allows custom methods to be added to +// EventTypeNamespaceLister. +type EventTypeNamespaceListerExpansion interface{} + // SubscriptionListerExpansion allows custom methods to be added to // SubscriptionLister. type SubscriptionListerExpansion interface{} diff --git a/pkg/reconciler/v1alpha1/broker/resources/ingress.go b/pkg/reconciler/v1alpha1/broker/resources/ingress.go index 5721d5654a2..2910bb35ca8 100644 --- a/pkg/reconciler/v1alpha1/broker/resources/ingress.go +++ b/pkg/reconciler/v1alpha1/broker/resources/ingress.go @@ -18,6 +18,7 @@ package resources import ( "fmt" + "strconv" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -83,6 +84,18 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment { Name: "BROKER", Value: args.Broker.Name, }, + { + Name: "ALLOW_ANY", + Value: strconv.FormatBool(args.Broker.Spec.IngressPolicy.AllowAny), + }, + { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, }, Ports: []corev1.ContainerPort{ { diff --git a/pkg/reconciler/v1alpha1/eventtype/eventtype.go b/pkg/reconciler/v1alpha1/eventtype/eventtype.go new file mode 100644 index 00000000000..c6fc6586dcc --- /dev/null +++ b/pkg/reconciler/v1alpha1/eventtype/eventtype.go @@ -0,0 +1,243 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtype + +import ( + "context" + "fmt" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/logging" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "eventtype-controller" + + // Name of the corev1.Events emitted from the reconciliation process. + eventTypeReconciled = "EventTypeReconciled" + eventTypeReconcileFailed = "EventTypeReconcileFailed" + eventTypeUpdateStatusFailed = "EventTypeUpdateStatusFailed" +) + +type reconciler struct { + client client.Client + dynamicClient dynamic.Interface + recorder record.EventRecorder + + logger *zap.Logger +} + +// Verify the struct implements reconcile.Reconciler. +var _ reconcile.Reconciler = &reconciler{} + +// ProvideController returns a function that returns an EventType controller. +func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Controller, error) { + // Setup a new controller to Reconcile EventTypes. + r := &reconciler{ + recorder: mgr.GetRecorder(controllerAgentName), + logger: logger, + } + c, err := controller.New(controllerAgentName, mgr, controller.Options{ + Reconciler: r, + }) + if err != nil { + return nil, err + } + + // Watch EventTypes. + if err = c.Watch(&source.Kind{Type: &v1alpha1.EventType{}}, &handler.EnqueueRequestForObject{}); err != nil { + return nil, err + } + + // Watch for Broker changes. E.g. if the Broker is deleted, we need to reconcile its EventTypes again. + if err = c.Watch(&source.Kind{Type: &v1alpha1.Broker{}}, &handler.EnqueueRequestsFromMapFunc{ToRequests: &mapBrokerToEventTypes{r: r}}); err != nil { + return nil, err + } + + return c, nil +} + +// mapBrokerToEventTypes maps Broker changes to all the EventTypes that correspond to that Broker. +type mapBrokerToEventTypes struct { + r *reconciler +} + +func (b *mapBrokerToEventTypes) Map(o handler.MapObject) []reconcile.Request { + ctx := context.Background() + eventTypes := make([]reconcile.Request, 0) + + opts := &client.ListOptions{ + Namespace: o.Meta.GetNamespace(), + // Set Raw because if we need to get more than one page, then we will put the continue token + // into opts.Raw.Continue. + Raw: &metav1.ListOptions{}, + } + for { + etl := &v1alpha1.EventTypeList{} + if err := b.r.client.List(ctx, opts, etl); err != nil { + b.r.logger.Error("Error listing EventTypes when Broker changed. Some EventTypes may not be reconciled.", zap.Error(err), zap.Any("broker", o)) + return eventTypes + } + + for _, et := range etl.Items { + if et.Spec.Broker == o.Meta.GetName() { + eventTypes = append(eventTypes, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: et.Namespace, + Name: et.Name, + }, + }) + } + } + if etl.Continue != "" { + opts.Raw.Continue = etl.Continue + } else { + return eventTypes + } + } +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + return nil +} + +func (r *reconciler) InjectConfig(c *rest.Config) error { + var err error + r.dynamicClient, err = dynamic.NewForConfig(c) + return err +} + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the EventType resource +// with the current status of the resource. +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + ctx := context.TODO() + ctx = logging.WithLogger(ctx, r.logger.With(zap.Any("request", request))) + + eventType := &v1alpha1.EventType{} + err := r.client.Get(ctx, request.NamespacedName, eventType) + + if errors.IsNotFound(err) { + logging.FromContext(ctx).Info("Could not find EventType") + return reconcile.Result{}, nil + } + + if err != nil { + logging.FromContext(ctx).Error("Could not get EventType", zap.Error(err)) + return reconcile.Result{}, err + } + + // Reconcile this copy of the EventType and then write back any status updates regardless of + // whether the reconcile error out. + reconcileErr := r.reconcile(ctx, eventType) + if reconcileErr != nil { + logging.FromContext(ctx).Error("Error reconciling EventType", zap.Error(reconcileErr)) + r.recorder.Eventf(eventType, corev1.EventTypeWarning, eventTypeReconcileFailed, "EventType reconciliation failed: %v", reconcileErr) + } else { + logging.FromContext(ctx).Debug("EventType reconciled") + r.recorder.Event(eventType, corev1.EventTypeNormal, eventTypeReconciled, "EventType reconciled") + } + + if _, err = r.updateStatus(eventType); err != nil { + logging.FromContext(ctx).Error("Failed to update EventType status", zap.Error(err)) + r.recorder.Eventf(eventType, corev1.EventTypeWarning, eventTypeUpdateStatusFailed, "Failed to update EventType's status: %v", err) + return reconcile.Result{}, err + } + + // Requeue if the resource is not ready + return reconcile.Result{}, reconcileErr +} + +func (r *reconciler) reconcile(ctx context.Context, et *v1alpha1.EventType) error { + et.Status.InitializeConditions() + + // 1. Verify the Broker exists. + // 2. Verify the Broker is ready. + + if et.DeletionTimestamp != nil { + // Everything is cleaned up by the garbage collector. + return nil + } + + b, err := r.getBroker(ctx, et) + if err != nil { + logging.FromContext(ctx).Error("Unable to get the Broker", zap.Error(err)) + et.Status.MarkBrokerDoesNotExist() + return err + } + et.Status.MarkBrokerExists() + + if !b.Status.IsReady() { + logging.FromContext(ctx).Error("Broker is not ready", zap.String("broker", b.Name)) + et.Status.MarkBrokerNotReady() + return fmt.Errorf("broker %q not ready", b.Name) + } + et.Status.MarkBrokerReady() + + return nil +} + +// updateStatus updates the EventType's status. +func (r *reconciler) updateStatus(eventType *v1alpha1.EventType) (*v1alpha1.EventType, error) { + ctx := context.TODO() + objectKey := client.ObjectKey{Namespace: eventType.Namespace, Name: eventType.Name} + latestEventType := &v1alpha1.EventType{} + + if err := r.client.Get(ctx, objectKey, latestEventType); err != nil { + return nil, err + } + + if equality.Semantic.DeepEqual(latestEventType.Status, eventType.Status) { + return eventType, nil + } + + latestEventType.Status = eventType.Status + if err := r.client.Status().Update(ctx, latestEventType); err != nil { + return nil, err + } + + return latestEventType, nil +} + +// getBroker returns the Broker for EventType 'et' if it exists, otherwise it returns an error. +func (r *reconciler) getBroker(ctx context.Context, et *v1alpha1.EventType) (*v1alpha1.Broker, error) { + b := &v1alpha1.Broker{} + name := types.NamespacedName{ + Namespace: et.Namespace, + Name: et.Spec.Broker, + } + err := r.client.Get(ctx, name, b) + return b, err +} diff --git a/pkg/reconciler/v1alpha1/eventtype/eventtype_test.go b/pkg/reconciler/v1alpha1/eventtype/eventtype_test.go new file mode 100644 index 00000000000..6f771867f1e --- /dev/null +++ b/pkg/reconciler/v1alpha1/eventtype/eventtype_test.go @@ -0,0 +1,279 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtype + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +const ( + testNS = "test-namespace" + eventTypeName = "test-eventtype" + eventTypeType = "test-type" + eventTypeBroker = "test-broker" +) + +var ( + trueVal = true + // deletionTime is used when objects are marked as deleted. Rfc3339Copy() + // truncates to seconds to match the loss of precision during serialization. + deletionTime = metav1.Now().Rfc3339Copy() + + // Map of events to set test cases' expectations easier. + events = map[string]corev1.Event{ + eventTypeReconciled: {Reason: eventTypeReconciled, Type: corev1.EventTypeNormal}, + eventTypeReconcileFailed: {Reason: eventTypeReconcileFailed, Type: corev1.EventTypeWarning}, + eventTypeUpdateStatusFailed: {Reason: eventTypeUpdateStatusFailed, Type: corev1.EventTypeWarning}, + } +) + +func init() { + // Add types to scheme + _ = v1alpha1.AddToScheme(scheme.Scheme) +} + +func TestProvideController(t *testing.T) { + // TODO(grantr) This needs a mock of manager.Manager. Creating a manager + // with a fake Config fails because the Manager tries to contact the + // apiserver. + + // cfg := &rest.Config{ + // Host: "http://foo:80", + // } + // + // mgr, err := manager.New(cfg, manager.Options{}) + // if err != nil { + // t.Fatalf("Error creating manager: %v", err) + // } + // + // _, err = ProvideController(mgr) + // if err != nil { + // t.Fatalf("Error in ProvideController: %v", err) + // } +} + +func TestInjectClient(t *testing.T) { + r := &reconciler{} + orig := r.client + n := fake.NewFakeClient() + if orig == n { + t.Errorf("Original and new clients are identical: %v", orig) + } + err := r.InjectClient(n) + if err != nil { + t.Errorf("Unexpected error injecting the client: %v", err) + } + if n != r.client { + t.Errorf("Unexpected client. Expected: '%v'. Actual: '%v'", n, r.client) + } +} + +func TestInjectConfig(t *testing.T) { + r := &reconciler{} + wantCfg := &rest.Config{ + Host: "http://foo", + } + + err := r.InjectConfig(wantCfg) + if err != nil { + t.Fatalf("Unexpected error injecting the config: %v", err) + } + + wantDynClient, err := dynamic.NewForConfig(wantCfg) + if err != nil { + t.Fatalf("Unexpected error generating dynamic client: %v", err) + } + + // Since dynamicClient doesn't export any fields, we can only test its type. + switch r.dynamicClient.(type) { + case dynamic.Interface: + // ok + default: + t.Errorf("Unexpected dynamicClient type. Expected: %T, Got: %T", wantDynClient, r.dynamicClient) + } +} + +func TestReconcile(t *testing.T) { + testCases := []controllertesting.TestCase{ + { + Name: "EventType not found", + }, + { + Name: "Get EventType error", + Scheme: scheme.Scheme, + Mocks: controllertesting.Mocks{ + MockGets: []controllertesting.MockGet{ + func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*v1alpha1.EventType); ok { + return controllertesting.Handled, errors.New("test error getting the EventType") + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantErrMsg: "test error getting the EventType", + }, + { + Name: "EventType being deleted", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeDeletingEventType(), + }, + WantEvent: []corev1.Event{events[eventTypeReconciled]}, + }, + { + Name: "Get Broker error", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeEventType(), + }, + Mocks: controllertesting.Mocks{ + MockGets: []controllertesting.MockGet{ + func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*v1alpha1.Broker); ok { + return controllertesting.Handled, errors.New("test error getting broker") + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantErrMsg: "test error getting broker", + WantEvent: []corev1.Event{events[eventTypeReconcileFailed]}, + }, + { + Name: "Broker not ready", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeEventType(), + makeBroker(), + }, + WantErrMsg: `broker "` + eventTypeBroker + `" not ready`, + WantEvent: []corev1.Event{events[eventTypeReconcileFailed]}, + }, + { + Name: "EventType reconciliation success", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeEventType(), + makeBrokerReady(), + }, + WantEvent: []corev1.Event{events[eventTypeReconciled]}, + WantPresent: []runtime.Object{ + makeReadyEventType(), + }, + }, + } + for _, tc := range testCases { + c := tc.GetClient() + dc := tc.GetDynamicClient() + recorder := tc.GetEventRecorder() + + r := &reconciler{ + client: c, + dynamicClient: dc, + recorder: recorder, + logger: zap.NewNop(), + } + tc.ReconcileKey = fmt.Sprintf("%s/%s", testNS, eventTypeName) + tc.IgnoreTimes = true + t.Run(tc.Name, tc.Runner(t, r, c, recorder)) + } +} + +func makeEventType() *v1alpha1.EventType { + return &v1alpha1.EventType{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "EventType", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: eventTypeName, + }, + Spec: v1alpha1.EventTypeSpec{ + Broker: eventTypeBroker, + Type: eventTypeType, + }, + } +} + +func makeReadyEventType() *v1alpha1.EventType { + t := makeEventType() + t.Status.InitializeConditions() + t.Status.MarkBrokerExists() + t.Status.MarkBrokerReady() + return t +} + +func makeDeletingEventType() *v1alpha1.EventType { + et := makeReadyEventType() + et.DeletionTimestamp = &deletionTime + return et +} + +func makeBroker() *v1alpha1.Broker { + return &v1alpha1.Broker{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Broker", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: eventTypeBroker, + }, + Spec: v1alpha1.BrokerSpec{ + ChannelTemplate: &v1alpha1.ChannelSpec{ + Provisioner: makeChannelProvisioner(), + }, + }, + } +} + +func makeBrokerReady() *v1alpha1.Broker { + b := makeBroker() + b.Status.InitializeConditions() + b.Status.MarkTriggerChannelReady() + b.Status.MarkFilterReady() + b.Status.MarkIngressReady() + b.Status.SetAddress("test-address") + b.Status.MarkIngressChannelReady() + b.Status.MarkIngressSubscriptionReady() + return b +} + +func makeChannelProvisioner() *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "ClusterChannelProvisioner", + Name: "my-provisioner", + } +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 33482fc0523..ea067aaa43c 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -20,8 +20,11 @@ import ( "bufio" "io" "os" + "regexp" "strings" "sync" + + "k8s.io/apimachinery/pkg/util/validation" ) const ( @@ -32,6 +35,9 @@ const ( var ( domainName string once sync.Once + + // Only allow alphanumeric, '-' or '.'. + validChars = regexp.MustCompile(`[^-\.a-z0-9]+`) ) // GetClusterDomainName returns cluster's domain name or an error @@ -67,3 +73,19 @@ func getClusterDomainName(r io.Reader) string { // For all abnormal cases return default domain name return defaultDomainName } + +// Converts 'name' to a valid DNS1123 subdomain, required for object names in K8s. +func ToDNS1123Subdomain(name string) string { + // If it is not a valid DNS1123 subdomain, make it a valid one. + if msgs := validation.IsDNS1123Subdomain(name); len(msgs) != 0 { + // If the length exceeds the max, cut it and leave some room for a potential generated UUID. + if len(name) > validation.DNS1123SubdomainMaxLength { + name = name[:validation.DNS1123SubdomainMaxLength-10] + } + name = strings.ToLower(name) + name = validChars.ReplaceAllString(name, "") + // Only start/end with alphanumeric. + name = strings.Trim(name, "-.") + } + return name +} diff --git a/test/builders.go b/test/builders.go index 6b48b2f6f1e..84d0dbd1265 100644 --- a/test/builders.go +++ b/test/builders.go @@ -81,3 +81,91 @@ func (b *TriggerBuilder) SubscriberSvc(svcName string) *TriggerBuilder { } return b } + +// Builder for broker objects. +type BrokerBuilder struct { + *eventingv1alpha1.Broker +} + +func NewBrokerBuilder(name, namespace string) *BrokerBuilder { + broker := &eventingv1alpha1.Broker{ + TypeMeta: metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Broker", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: eventingv1alpha1.BrokerSpec{ + IngressPolicy: &eventingv1alpha1.IngressPolicySpec{ + AllowAny: true, + }, + }, + } + + return &BrokerBuilder{ + Broker: broker, + } +} + +func (b *BrokerBuilder) Build() *eventingv1alpha1.Broker { + return b.Broker.DeepCopy() +} + +func (b *BrokerBuilder) IngressPolicy(policy *eventingv1alpha1.IngressPolicySpec) *BrokerBuilder { + b.Broker.Spec.IngressPolicy = policy + return b +} + +// Builder for EventType objects. +type EventTypeBuilder struct { + *eventingv1alpha1.EventType +} + +func NewEventTypeBuilder(name, namespace string) *EventTypeBuilder { + eventType := &eventingv1alpha1.EventType{ + TypeMeta: metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "EventType", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: eventingv1alpha1.EventTypeSpec{ + Broker: "default", + Type: CloudEventDefaultType, + Source: CloudEventDefaultSource, + Schema: "", + }, + } + + return &EventTypeBuilder{ + EventType: eventType, + } +} + +func (b *EventTypeBuilder) Build() *eventingv1alpha1.EventType { + return b.EventType.DeepCopy() +} + +func (b *EventTypeBuilder) Type(eventType string) *EventTypeBuilder { + b.Spec.Type = eventType + return b +} + +func (b *EventTypeBuilder) Source(eventSource string) *EventTypeBuilder { + b.Spec.Source = eventSource + return b +} + +func (b *EventTypeBuilder) Broker(brokerName string) *EventTypeBuilder { + b.Spec.Broker = brokerName + return b +} + +func (b *EventTypeBuilder) Schema(schema string) *EventTypeBuilder { + b.Spec.Schema = schema + return b +} diff --git a/test/crd.go b/test/crd.go index 046532f5f7b..807977a2ff8 100644 --- a/test/crd.go +++ b/test/crd.go @@ -118,6 +118,7 @@ const ( CloudEventEncodingStructured = "structured" CloudEventDefaultEncoding = CloudEventEncodingBinary CloudEventDefaultType = "dev.knative.test.event" + CloudEventDefaultSource = "dev.knative.test.source" ) // EventSenderPod creates a Pod that sends a single event to the given address. diff --git a/test/crd_checks.go b/test/crd_checks.go index d165151b783..574b45260a4 100644 --- a/test/crd_checks.go +++ b/test/crd_checks.go @@ -115,6 +115,29 @@ func WaitForTriggerState(client eventingclient.TriggerInterface, name string, in }) } +// WaitForEventTypeState polls the status of the EventType called name from client +// every interval until inState returns `true` indicating it is done, returns an +// error or timeout. desc will be used to name the metric that is emitted to +// track how long it took for name to get into the state checked by inState. +func WaitForEventTypeState(client eventingclient.EventTypeInterface, name string, inState func(r *eventingv1alpha1.EventType) (bool, error), desc string, timeout time.Duration) error { + metricName := fmt.Sprintf("WaitForEventTypeState/%s/%s", name, desc) + _, span := trace.StartSpan(context.Background(), metricName) + defer span.End() + + return wait.PollImmediate(interval, timeout, func() (bool, error) { + r, err := client.Get(name, metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + // Return false as we are not done yet. + // We swallow the error to keep on polling, + // as the event + return false, nil + } else if err != nil { + return true, err + } + return inState(r) + }) +} + // WaitForTriggersListState polls the status of the TriggerList // from client every interval until inState returns `true` indicating it // is done, returns an error or timeout. desc will be used to name the metric @@ -132,3 +155,25 @@ func WaitForTriggersListState(clients eventingclient.TriggerInterface, inState f return inState(t) }) } + +// WaitForEventTypeListState polls the status of the EventTypeList +// from client every interval until inState returns `true` indicating it +// is done, returns an error or timeout. desc will be used to name the metric +// that is emitted to track how long it took to get into the state checked by inState. +func WaitForEventTypeListState(clients eventingclient.EventTypeInterface, inState func(t *eventingv1alpha1.EventTypeList) (bool, error), desc string, timeout time.Duration) error { + metricName := fmt.Sprintf("WaitForEventTypeListState/%s", desc) + _, span := trace.StartSpan(context.Background(), metricName) + defer span.End() + + return wait.PollImmediate(interval, timeout, func() (bool, error) { + l, err := clients.List(metav1.ListOptions{}) + if err != nil { + return true, err + } + // If there are no items, then we return false as we are not done yet. + if len(l.Items) == 0 { + return false, nil + } + return inState(l) + }) +} diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index f559c3935e6..10df14c18d6 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -249,6 +249,17 @@ func CreateTrigger(clients *test.Clients, trigger *v1alpha1.Trigger, logf loggin return nil } +// CreateEventType will create an EventType. +func CreateEventType(clients *test.Clients, eventType *v1alpha1.EventType, logf logging.FormatLogger, cleaner *test.Cleaner) error { + eventTypes := clients.Eventing.EventingV1alpha1().EventTypes(eventType.Namespace) + res, err := eventTypes.Create(eventType) + if err != nil { + return err + } + cleaner.Add(v1alpha1.SchemeGroupVersion.Group, v1alpha1.SchemeGroupVersion.Version, "eventtypes", eventType.Namespace, res.ObjectMeta.Name) + return nil +} + // WithTriggerReady creates a Trigger and waits until it is Ready. func WithTriggerReady(clients *test.Clients, trigger *v1alpha1.Trigger, logf logging.FormatLogger, cleaner *test.Cleaner) error { if err := CreateTrigger(clients, trigger, logf, cleaner); err != nil { @@ -269,6 +280,29 @@ func WithTriggerReady(clients *test.Clients, trigger *v1alpha1.Trigger, logf log return nil } +// WithEventTypeReady creates an EventType and waits until it is Ready. +func WithEventTypeReady(clients *test.Clients, eventType *v1alpha1.EventType, logf logging.FormatLogger, cleaner *test.Cleaner) error { + if err := CreateEventType(clients, eventType, logf, cleaner); err != nil { + return err + } + return WaitForEventTypeReady(clients, eventType) +} + +// WaitForEventTypeReady waits until the EventType is Ready. +func WaitForEventTypeReady(clients *test.Clients, eventType *v1alpha1.EventType) error { + eventTypes := clients.Eventing.EventingV1alpha1().EventTypes(eventType.Namespace) + if err := test.WaitForEventTypeState(eventTypes, eventType.Name, test.IsEventTypeReady, "EventTypeIsReady", time.Minute); err != nil { + return err + } + // Update the given object so they'll reflect the ready state. + updatedEventType, err := eventTypes.Get(eventType.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updatedEventType.DeepCopyInto(eventType) + return nil +} + // CreateServiceAccount will create a service account. func CreateServiceAccount(clients *test.Clients, sa *corev1.ServiceAccount, _ logging.FormatLogger, cleaner *test.Cleaner) error { namespace := sa.Namespace @@ -450,6 +484,15 @@ func WaitForAllTriggersReady(clients *test.Clients, namespace string, logf loggi return nil } +// WaitForAllEventTypesReady will wait until all EventTypes in the given namespace are ready. +func WaitForAllEventTypesReady(clients *test.Clients, logf logging.FormatLogger, namespace string) error { + eventTypes := clients.Eventing.EventingV1alpha1().EventTypes(namespace) + if err := test.WaitForEventTypeListState(eventTypes, test.EventTypesReady, "EventTypeIsReady", timeout); err != nil { + return err + } + return nil +} + // CreateNamespaceIfNeeded creates a new namespace if it does not exist. func CreateNamespaceIfNeeded(t *testing.T, clients *test.Clients, namespace string, logf logging.FormatLogger) { nsSpec, err := clients.Kube.Kube.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}) diff --git a/test/e2e/registry_test.go b/test/e2e/registry_test.go new file mode 100644 index 00000000000..f012d1a82b5 --- /dev/null +++ b/test/e2e/registry_test.go @@ -0,0 +1,203 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + "time" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + + "github.com/knative/eventing/test" + pkgTest "github.com/knative/pkg/test" + "k8s.io/apimachinery/pkg/util/uuid" +) + +// Helper to setup test data and expectations. +type testFixture struct { + ingressPolicy *v1alpha1.IngressPolicySpec + preRegisterEvent bool + wantEventDelivered bool + wantEventRegistered bool +} + +func TestRegistryBrokerAllowAnyAccept(t *testing.T) { + fixture := &testFixture{ + ingressPolicy: &v1alpha1.IngressPolicySpec{ + AllowAny: true, + }, + wantEventDelivered: true, + } + Registry(t, fixture) +} + +func TestRegistryBrokerAllowRegisteredAccept(t *testing.T) { + fixture := &testFixture{ + ingressPolicy: &v1alpha1.IngressPolicySpec{ + AllowAny: false, + }, + preRegisterEvent: true, + wantEventDelivered: true, + wantEventRegistered: true, + } + Registry(t, fixture) +} + +func TestRegistryBrokerAllowRegisteredNotAccept(t *testing.T) { + fixture := &testFixture{ + ingressPolicy: &v1alpha1.IngressPolicySpec{ + AllowAny: false, + }, + } + Registry(t, fixture) +} + +func TestRegistryBrokerAutoAddAccept(t *testing.T) { + fixture := &testFixture{ + ingressPolicy: &v1alpha1.IngressPolicySpec{ + AutoAdd: true, + }, + wantEventDelivered: true, + wantEventRegistered: true, + } + Registry(t, fixture) +} + +func Registry(t *testing.T, fixture *testFixture) { + clients, cleaner := Setup(t, t.Logf) + + ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) + + defer cleanupNS() + defer TearDown(clients, cleaner, t.Logf) + + // Define the constants here to avoid conflicts with other e2e tests. + const ( + brokerName = "test-broker" + triggerName = "test-trigger" + eventTypeName = "test-eventtype" + subscriberName = "test-dumper" + waitTimeForBrokerPodsRunning = 30 * time.Second + ) + + t.Logf("Labeling Namespace %s", ns) + err := LabelNamespace(clients, t.Logf, map[string]string{"knative-eventing-injection": "enabled"}) + if err != nil { + t.Fatalf("Error labeling Namespace: %v", err) + } + t.Logf("Namespace %s labeled", ns) + + broker := test.NewBrokerBuilder(brokerName, ns). + IngressPolicy(fixture.ingressPolicy). + Build() + t.Logf("Creating and waiting for Broker %s ready", broker.Name) + err = WithBrokerReady(clients, broker, t.Logf, cleaner) + if err != nil { + t.Fatalf("Error creating and waiting for Broker ready: %v", err) + } + brokerUrl := fmt.Sprintf("http://%s", broker.Status.Address.Hostname) + t.Logf("Broker created and ready: %q", brokerUrl) + + t.Logf("Creating Subscriber Pod and Service") + selector := map[string]string{"e2etest": string(uuid.NewUUID())} + subscriberPod := test.EventLoggerPod(subscriberName, ns, selector) + subscriberSvc := test.Service(subscriberName, ns, selector) + subscriberPod, err = CreatePodAndServiceReady(clients, subscriberPod, subscriberSvc, ns, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create Subscriber Pod and Service, and get them ready: %v", err) + } + t.Logf("Subscriber Pod and Service created and ready") + + trigger := test.NewTriggerBuilder(triggerName, ns). + Broker(broker.Name). + SubscriberSvc(subscriberSvc.Name). + Build() + t.Logf("Creating and waiting for Trigger %s ready", trigger.Name) + err = WithTriggerReady(clients, trigger, t.Logf, cleaner) + if err != nil { + t.Fatalf("Error creating and waiting for Trigger ready: %v", err) + } + t.Logf("Trigger created and ready") + + eventType := test.NewEventTypeBuilder(eventTypeName, ns). + Broker(brokerName). + // use the default values for source, type and schema. + Build() + if fixture.preRegisterEvent { + t.Logf("Creating and waiting for EventType %s ready", eventType.Name) + err = WithEventTypeReady(clients, eventType, t.Logf, cleaner) + if err != nil { + t.Fatalf("Error creating and waiting for EventType ready: %v", err) + } + t.Logf("EventType created and ready") + } + + // We notice some crashLoopBacks in the Broker's filter and ingress pod creation. + // We then delay the creation of the sender pod in order not to miss the event. + t.Logf("Waiting for Broker's filter and ingress POD to become running") + time.Sleep(waitTimeForBrokerPodsRunning) + + body := fmt.Sprintf("Registry %s", uuid.NewUUID()) + cloudEvent := &test.CloudEvent{ + Source: test.CloudEventDefaultSource, + Type: test.CloudEventDefaultType, + Data: fmt.Sprintf(`{"msg":%q}`, body), + } + t.Logf("Creating Sender Pod") + senderPod := test.EventSenderPod("sender", ns, brokerUrl, cloudEvent) + if err := CreatePod(clients, senderPod, t.Logf, cleaner); err != nil { + t.Fatalf("Error creating event sender Pod: %v", err) + } + t.Logf("Sender Pod created. Waiting for it to be running") + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + t.Fatalf("Error waiting for Sender Pod to become running: %v", err) + } + t.Logf("Sender Pod running") + + if fixture.wantEventDelivered { + t.Logf("Verifying Event delivered") + if err := WaitForLogContents(clients, t.Logf, subscriberName, subscriberPod.Spec.Containers[0].Name, ns, []string{body}); err != nil { + t.Fatalf("Event not found in logs of Subscriber Pod %q: %v", subscriberName, err) + } + } else { + t.Logf("Verifying Event not delivered") + found, err := FindAnyLogContents(clients, t.Logf, subscriberName, subscriberPod.Spec.Containers[0].Name, ns, []string{body}) + if err != nil { + t.Fatalf("Failed querying to find log contents in Subscriber Pod %q: %v", subscriberName, err) + } + if found { + t.Fatalf("Unexpected event found in logs of Subscriber Pod %q", subscriberName) + } + } + + // As the EventType might have been auto-generated, we cannot query for the particular EventType + // with the name given by the constant eventTypeName. We just query for all of them, and see if all are ready. + err = WaitForAllEventTypesReady(clients, t.Logf, ns) + if fixture.wantEventRegistered { + t.Logf("Verifying Event registered") + if err != nil { + t.Fatalf("EventType %q not ready: %v", eventType.Name, err) + } + } else { + t.Logf("Verifying Event not registered") + if err == nil { + t.Fatalf("EventType %q registered and ready, but expected not to be", eventType.Name) + } + } +} diff --git a/test/states.go b/test/states.go index a0466e5dc18..7599f5717e8 100644 --- a/test/states.go +++ b/test/states.go @@ -47,6 +47,12 @@ func IsTriggerReady(t *eventingv1alpha1.Trigger) (bool, error) { return t.Status.IsReady(), nil } +// IsEventTypeReady will check the status conditions of the EventType and +// return true if the EventType is ready. +func IsEventTypeReady(et *eventingv1alpha1.EventType) (bool, error) { + return et.Status.IsReady(), nil +} + // TriggersReady will check the status conditions of the trigger list and return true // if all triggers are Ready. func TriggersReady(triggerList *eventingv1alpha1.TriggerList) (bool, error) { @@ -63,6 +69,22 @@ func TriggersReady(triggerList *eventingv1alpha1.TriggerList) (bool, error) { return true, nil } +// EventTypesReady will check the status conditions of the EventTypeList and return true +// if all EventTypes are Ready. +func EventTypesReady(eventTypeList *eventingv1alpha1.EventTypeList) (bool, error) { + var names []string + for _, t := range eventTypeList.Items { + names = append(names, t.Name) + } + log.Printf("Checking event types: %v", names) + for _, eventType := range eventTypeList.Items { + if !eventType.Status.IsReady() { + return false, nil + } + } + return true, nil +} + // PodsRunning will check the status conditions of the pod list and return true // if all pods are Running. func PodsRunning(podList *corev1.PodList) (bool, error) { diff --git a/third_party/VENDOR-LICENSE b/third_party/VENDOR-LICENSE index dcf5fffc254..2a4f2a03a7b 100644 --- a/third_party/VENDOR-LICENSE +++ b/third_party/VENDOR-LICENSE @@ -3454,6 +3454,31 @@ SOFTWARE. +=========================================================== +Import: github.com/knative/eventing/vendor/github.com/kelseyhightower/envconfig + +Copyright (c) 2013 Kelsey Hightower + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + + + =========================================================== Import: github.com/knative/eventing/vendor/github.com/knative/pkg diff --git a/vendor/github.com/kelseyhightower/envconfig/LICENSE b/vendor/github.com/kelseyhightower/envconfig/LICENSE new file mode 100644 index 00000000000..4bfa7a84d81 --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2013 Kelsey Hightower + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/kelseyhightower/envconfig/doc.go b/vendor/github.com/kelseyhightower/envconfig/doc.go new file mode 100644 index 00000000000..f28561cd1cb --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/doc.go @@ -0,0 +1,8 @@ +// Copyright (c) 2013 Kelsey Hightower. All rights reserved. +// Use of this source code is governed by the MIT License that can be found in +// the LICENSE file. + +// Package envconfig implements decoding of environment variables based on a user +// defined specification. A typical use is using environment variables for +// configuration settings. +package envconfig diff --git a/vendor/github.com/kelseyhightower/envconfig/env_os.go b/vendor/github.com/kelseyhightower/envconfig/env_os.go new file mode 100644 index 00000000000..a6a014a2b47 --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/env_os.go @@ -0,0 +1,7 @@ +// +build appengine + +package envconfig + +import "os" + +var lookupEnv = os.LookupEnv diff --git a/vendor/github.com/kelseyhightower/envconfig/env_syscall.go b/vendor/github.com/kelseyhightower/envconfig/env_syscall.go new file mode 100644 index 00000000000..9d98085b99f --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/env_syscall.go @@ -0,0 +1,7 @@ +// +build !appengine + +package envconfig + +import "syscall" + +var lookupEnv = syscall.Getenv diff --git a/vendor/github.com/kelseyhightower/envconfig/envconfig.go b/vendor/github.com/kelseyhightower/envconfig/envconfig.go new file mode 100644 index 00000000000..892d74699f6 --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/envconfig.go @@ -0,0 +1,319 @@ +// Copyright (c) 2013 Kelsey Hightower. All rights reserved. +// Use of this source code is governed by the MIT License that can be found in +// the LICENSE file. + +package envconfig + +import ( + "encoding" + "errors" + "fmt" + "reflect" + "regexp" + "strconv" + "strings" + "time" +) + +// ErrInvalidSpecification indicates that a specification is of the wrong type. +var ErrInvalidSpecification = errors.New("specification must be a struct pointer") + +// A ParseError occurs when an environment variable cannot be converted to +// the type required by a struct field during assignment. +type ParseError struct { + KeyName string + FieldName string + TypeName string + Value string + Err error +} + +// Decoder has the same semantics as Setter, but takes higher precedence. +// It is provided for historical compatibility. +type Decoder interface { + Decode(value string) error +} + +// Setter is implemented by types can self-deserialize values. +// Any type that implements flag.Value also implements Setter. +type Setter interface { + Set(value string) error +} + +func (e *ParseError) Error() string { + return fmt.Sprintf("envconfig.Process: assigning %[1]s to %[2]s: converting '%[3]s' to type %[4]s. details: %[5]s", e.KeyName, e.FieldName, e.Value, e.TypeName, e.Err) +} + +// varInfo maintains information about the configuration variable +type varInfo struct { + Name string + Alt string + Key string + Field reflect.Value + Tags reflect.StructTag +} + +// GatherInfo gathers information about the specified struct +func gatherInfo(prefix string, spec interface{}) ([]varInfo, error) { + expr := regexp.MustCompile("([^A-Z]+|[A-Z][^A-Z]+|[A-Z]+)") + s := reflect.ValueOf(spec) + + if s.Kind() != reflect.Ptr { + return nil, ErrInvalidSpecification + } + s = s.Elem() + if s.Kind() != reflect.Struct { + return nil, ErrInvalidSpecification + } + typeOfSpec := s.Type() + + // over allocate an info array, we will extend if needed later + infos := make([]varInfo, 0, s.NumField()) + for i := 0; i < s.NumField(); i++ { + f := s.Field(i) + ftype := typeOfSpec.Field(i) + if !f.CanSet() || ftype.Tag.Get("ignored") == "true" { + continue + } + + for f.Kind() == reflect.Ptr { + if f.IsNil() { + if f.Type().Elem().Kind() != reflect.Struct { + // nil pointer to a non-struct: leave it alone + break + } + // nil pointer to struct: create a zero instance + f.Set(reflect.New(f.Type().Elem())) + } + f = f.Elem() + } + + // Capture information about the config variable + info := varInfo{ + Name: ftype.Name, + Field: f, + Tags: ftype.Tag, + Alt: strings.ToUpper(ftype.Tag.Get("envconfig")), + } + + // Default to the field name as the env var name (will be upcased) + info.Key = info.Name + + // Best effort to un-pick camel casing as separate words + if ftype.Tag.Get("split_words") == "true" { + words := expr.FindAllStringSubmatch(ftype.Name, -1) + if len(words) > 0 { + var name []string + for _, words := range words { + name = append(name, words[0]) + } + + info.Key = strings.Join(name, "_") + } + } + if info.Alt != "" { + info.Key = info.Alt + } + if prefix != "" { + info.Key = fmt.Sprintf("%s_%s", prefix, info.Key) + } + info.Key = strings.ToUpper(info.Key) + infos = append(infos, info) + + if f.Kind() == reflect.Struct { + // honor Decode if present + if decoderFrom(f) == nil && setterFrom(f) == nil && textUnmarshaler(f) == nil { + innerPrefix := prefix + if !ftype.Anonymous { + innerPrefix = info.Key + } + + embeddedPtr := f.Addr().Interface() + embeddedInfos, err := gatherInfo(innerPrefix, embeddedPtr) + if err != nil { + return nil, err + } + infos = append(infos[:len(infos)-1], embeddedInfos...) + + continue + } + } + } + return infos, nil +} + +// Process populates the specified struct based on environment variables +func Process(prefix string, spec interface{}) error { + infos, err := gatherInfo(prefix, spec) + + for _, info := range infos { + + // `os.Getenv` cannot differentiate between an explicitly set empty value + // and an unset value. `os.LookupEnv` is preferred to `syscall.Getenv`, + // but it is only available in go1.5 or newer. We're using Go build tags + // here to use os.LookupEnv for >=go1.5 + value, ok := lookupEnv(info.Key) + if !ok && info.Alt != "" { + value, ok = lookupEnv(info.Alt) + } + + def := info.Tags.Get("default") + if def != "" && !ok { + value = def + } + + req := info.Tags.Get("required") + if !ok && def == "" { + if req == "true" { + return fmt.Errorf("required key %s missing value", info.Key) + } + continue + } + + err := processField(value, info.Field) + if err != nil { + return &ParseError{ + KeyName: info.Key, + FieldName: info.Name, + TypeName: info.Field.Type().String(), + Value: value, + Err: err, + } + } + } + + return err +} + +// MustProcess is the same as Process but panics if an error occurs +func MustProcess(prefix string, spec interface{}) { + if err := Process(prefix, spec); err != nil { + panic(err) + } +} + +func processField(value string, field reflect.Value) error { + typ := field.Type() + + decoder := decoderFrom(field) + if decoder != nil { + return decoder.Decode(value) + } + // look for Set method if Decode not defined + setter := setterFrom(field) + if setter != nil { + return setter.Set(value) + } + + if t := textUnmarshaler(field); t != nil { + return t.UnmarshalText([]byte(value)) + } + + if typ.Kind() == reflect.Ptr { + typ = typ.Elem() + if field.IsNil() { + field.Set(reflect.New(typ)) + } + field = field.Elem() + } + + switch typ.Kind() { + case reflect.String: + field.SetString(value) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + var ( + val int64 + err error + ) + if field.Kind() == reflect.Int64 && typ.PkgPath() == "time" && typ.Name() == "Duration" { + var d time.Duration + d, err = time.ParseDuration(value) + val = int64(d) + } else { + val, err = strconv.ParseInt(value, 0, typ.Bits()) + } + if err != nil { + return err + } + + field.SetInt(val) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + val, err := strconv.ParseUint(value, 0, typ.Bits()) + if err != nil { + return err + } + field.SetUint(val) + case reflect.Bool: + val, err := strconv.ParseBool(value) + if err != nil { + return err + } + field.SetBool(val) + case reflect.Float32, reflect.Float64: + val, err := strconv.ParseFloat(value, typ.Bits()) + if err != nil { + return err + } + field.SetFloat(val) + case reflect.Slice: + vals := strings.Split(value, ",") + sl := reflect.MakeSlice(typ, len(vals), len(vals)) + for i, val := range vals { + err := processField(val, sl.Index(i)) + if err != nil { + return err + } + } + field.Set(sl) + case reflect.Map: + pairs := strings.Split(value, ",") + mp := reflect.MakeMap(typ) + for _, pair := range pairs { + kvpair := strings.Split(pair, ":") + if len(kvpair) != 2 { + return fmt.Errorf("invalid map item: %q", pair) + } + k := reflect.New(typ.Key()).Elem() + err := processField(kvpair[0], k) + if err != nil { + return err + } + v := reflect.New(typ.Elem()).Elem() + err = processField(kvpair[1], v) + if err != nil { + return err + } + mp.SetMapIndex(k, v) + } + field.Set(mp) + } + + return nil +} + +func interfaceFrom(field reflect.Value, fn func(interface{}, *bool)) { + // it may be impossible for a struct field to fail this check + if !field.CanInterface() { + return + } + var ok bool + fn(field.Interface(), &ok) + if !ok && field.CanAddr() { + fn(field.Addr().Interface(), &ok) + } +} + +func decoderFrom(field reflect.Value) (d Decoder) { + interfaceFrom(field, func(v interface{}, ok *bool) { d, *ok = v.(Decoder) }) + return d +} + +func setterFrom(field reflect.Value) (s Setter) { + interfaceFrom(field, func(v interface{}, ok *bool) { s, *ok = v.(Setter) }) + return s +} + +func textUnmarshaler(field reflect.Value) (t encoding.TextUnmarshaler) { + interfaceFrom(field, func(v interface{}, ok *bool) { t, *ok = v.(encoding.TextUnmarshaler) }) + return t +} diff --git a/vendor/github.com/kelseyhightower/envconfig/usage.go b/vendor/github.com/kelseyhightower/envconfig/usage.go new file mode 100644 index 00000000000..184635380f2 --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/usage.go @@ -0,0 +1,158 @@ +// Copyright (c) 2016 Kelsey Hightower and others. All rights reserved. +// Use of this source code is governed by the MIT License that can be found in +// the LICENSE file. + +package envconfig + +import ( + "encoding" + "fmt" + "io" + "os" + "reflect" + "strconv" + "strings" + "text/tabwriter" + "text/template" +) + +const ( + // DefaultListFormat constant to use to display usage in a list format + DefaultListFormat = `This application is configured via the environment. The following environment +variables can be used: +{{range .}} +{{usage_key .}} + [description] {{usage_description .}} + [type] {{usage_type .}} + [default] {{usage_default .}} + [required] {{usage_required .}}{{end}} +` + // DefaultTableFormat constant to use to display usage in a tabluar format + DefaultTableFormat = `This application is configured via the environment. The following environment +variables can be used: + +KEY TYPE DEFAULT REQUIRED DESCRIPTION +{{range .}}{{usage_key .}} {{usage_type .}} {{usage_default .}} {{usage_required .}} {{usage_description .}} +{{end}}` +) + +var ( + decoderType = reflect.TypeOf((*Decoder)(nil)).Elem() + setterType = reflect.TypeOf((*Setter)(nil)).Elem() + unmarshalerType = reflect.TypeOf((*encoding.TextUnmarshaler)(nil)).Elem() +) + +func implementsInterface(t reflect.Type) bool { + return t.Implements(decoderType) || + reflect.PtrTo(t).Implements(decoderType) || + t.Implements(setterType) || + reflect.PtrTo(t).Implements(setterType) || + t.Implements(unmarshalerType) || + reflect.PtrTo(t).Implements(unmarshalerType) +} + +// toTypeDescription converts Go types into a human readable description +func toTypeDescription(t reflect.Type) string { + switch t.Kind() { + case reflect.Array, reflect.Slice: + return fmt.Sprintf("Comma-separated list of %s", toTypeDescription(t.Elem())) + case reflect.Map: + return fmt.Sprintf( + "Comma-separated list of %s:%s pairs", + toTypeDescription(t.Key()), + toTypeDescription(t.Elem()), + ) + case reflect.Ptr: + return toTypeDescription(t.Elem()) + case reflect.Struct: + if implementsInterface(t) && t.Name() != "" { + return t.Name() + } + return "" + case reflect.String: + name := t.Name() + if name != "" && name != "string" { + return name + } + return "String" + case reflect.Bool: + name := t.Name() + if name != "" && name != "bool" { + return name + } + return "True or False" + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + name := t.Name() + if name != "" && !strings.HasPrefix(name, "int") { + return name + } + return "Integer" + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + name := t.Name() + if name != "" && !strings.HasPrefix(name, "uint") { + return name + } + return "Unsigned Integer" + case reflect.Float32, reflect.Float64: + name := t.Name() + if name != "" && !strings.HasPrefix(name, "float") { + return name + } + return "Float" + } + return fmt.Sprintf("%+v", t) +} + +// Usage writes usage information to stderr using the default header and table format +func Usage(prefix string, spec interface{}) error { + // The default is to output the usage information as a table + // Create tabwriter instance to support table output + tabs := tabwriter.NewWriter(os.Stdout, 1, 0, 4, ' ', 0) + + err := Usagef(prefix, spec, tabs, DefaultTableFormat) + tabs.Flush() + return err +} + +// Usagef writes usage information to the specified io.Writer using the specifed template specification +func Usagef(prefix string, spec interface{}, out io.Writer, format string) error { + + // Specify the default usage template functions + functions := template.FuncMap{ + "usage_key": func(v varInfo) string { return v.Key }, + "usage_description": func(v varInfo) string { return v.Tags.Get("desc") }, + "usage_type": func(v varInfo) string { return toTypeDescription(v.Field.Type()) }, + "usage_default": func(v varInfo) string { return v.Tags.Get("default") }, + "usage_required": func(v varInfo) (string, error) { + req := v.Tags.Get("required") + if req != "" { + reqB, err := strconv.ParseBool(req) + if err != nil { + return "", err + } + if reqB { + req = "true" + } + } + return req, nil + }, + } + + tmpl, err := template.New("envconfig").Funcs(functions).Parse(format) + if err != nil { + return err + } + + return Usaget(prefix, spec, out, tmpl) +} + +// Usaget writes usage information to the specified io.Writer using the specified template +func Usaget(prefix string, spec interface{}, out io.Writer, tmpl *template.Template) error { + // gather first + infos, err := gatherInfo(prefix, spec) + if err != nil { + return err + } + + return tmpl.Execute(out, infos) +}