diff --git a/Gopkg.lock b/Gopkg.lock index 5abe515eb84..56135cbdeef 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1374,7 +1374,7 @@ [[projects]] branch = "master" - digest = "1:94d7cfde4eee652c089b47f0200b86bc28e618de18a2e5368b0bbc8e2b601026" + digest = "1:e41775799602da16406d666dcc9dbe8926f1f0655c3a3aab518825ba2fd6e109" name = "knative.dev/pkg" packages = [ "apis", @@ -1479,7 +1479,7 @@ "webhook/resourcesemantics/validation", ] pruneopts = "T" - revision = "c13e86e2d4f484f1dd3b8ca6474994c16da74f20" + revision = "381424b19753db69863563bd6b280d582e5b14db" [[projects]] branch = "master" @@ -1490,7 +1490,7 @@ "tools/dep-collector", ] pruneopts = "UT" - revision = "54bbe34c6c26f6251a92d48a546166fc28d434a6" + revision = "0bc1e8bb1582a8738da1b5c485084a84bf76f17c" [[projects]] digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c" @@ -1551,7 +1551,6 @@ "k8s.io/api/core/v1", "k8s.io/api/rbac/v1", "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1", - "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset", "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake", "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1", "k8s.io/apimachinery/pkg/api/equality", @@ -1605,6 +1604,7 @@ "knative.dev/pkg/apis/duck/v1", "knative.dev/pkg/apis/duck/v1alpha1", "knative.dev/pkg/apis/duck/v1beta1", + "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1beta1/customresourcedefinition", "knative.dev/pkg/client/injection/ducks/duck/v1/addressable", "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake", "knative.dev/pkg/client/injection/ducks/duck/v1/conditions", diff --git a/cmd/controller/main.go b/cmd/controller/main.go index f1252bfbdd1..b11bcd72b6f 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -28,6 +28,7 @@ import ( "knative.dev/eventing/pkg/reconciler/parallel" pingsource "knative.dev/eventing/pkg/reconciler/pingsource/controller" "knative.dev/eventing/pkg/reconciler/sequence" + sourcecrd "knative.dev/eventing/pkg/reconciler/source/crd" "knative.dev/eventing/pkg/reconciler/subscription" "knative.dev/eventing/pkg/reconciler/trigger" ) @@ -50,5 +51,7 @@ func main() { // Sources apiserversource.NewController, pingsource.NewController, + // Sources CRD + sourcecrd.NewController, ) } diff --git a/config/core/resources/eventtype.yaml b/config/core/resources/eventtype.yaml index 8bd41b11333..915f3675944 100644 --- a/config/core/resources/eventtype.yaml +++ b/config/core/resources/eventtype.yaml @@ -56,6 +56,7 @@ spec: - name: Description type: string JSONPath: ".spec.description" + # TODO remove Status https://github.com/knative/eventing/issues/2750 - name: Ready type: string JSONPath: ".status.conditions[?(@.type==\"Ready\")].status" diff --git a/pkg/apis/eventing/register.go b/pkg/apis/eventing/register.go index c19a99a3c41..802e5f02167 100644 --- a/pkg/apis/eventing/register.go +++ b/pkg/apis/eventing/register.go @@ -56,6 +56,10 @@ const ( // ScopeCluster indicates the resource must be // handled by the cluster-scoped component ScopeCluster = "cluster" + + // EventTypesAnnotationKey is the annotation key to specify + // if a Source has event types defines in its CRD. + EventTypesAnnotationKey = "registry.knative.dev/eventTypes" ) var ( diff --git a/pkg/apis/eventing/v1alpha1/eventtype_types.go b/pkg/apis/eventing/v1alpha1/eventtype_types.go index 0d7369906cc..4bfa731690e 100644 --- a/pkg/apis/eventing/v1alpha1/eventtype_types.go +++ b/pkg/apis/eventing/v1alpha1/eventtype_types.go @@ -40,6 +40,7 @@ type EventType struct { // Status represents the current state of the EventType. // This data may be out of date. // +optional + // TODO might be removed https://github.com/knative/eventing/issues/2750 Status EventTypeStatus `json:"status,omitempty"` } @@ -61,13 +62,16 @@ type EventTypeSpec struct { // Type represents the CloudEvents type. It is authoritative. Type string `json:"type"` // Source is a URI, it represents the CloudEvents source. - Source string `json:"source"` + // +optional + Source string `json:"source,omitempty"` // Schema is a URI, it represents the CloudEvents schemaurl extension attribute. // It may be a JSON schema, a protobuf schema, etc. It is optional. // +optional Schema string `json:"schema,omitempty"` + // TODO remove https://github.com/knative/eventing/issues/2750 // Broker refers to the Broker that can provide the EventType. - Broker string `json:"broker"` + // +optional + Broker string `json:"broker,omitempty"` // Description is an optional field used to describe the EventType, in any meaningful way. // +optional Description string `json:"description,omitempty"` diff --git a/pkg/apis/eventing/v1alpha1/eventtype_validation.go b/pkg/apis/eventing/v1alpha1/eventtype_validation.go index 8bc9fa6f7c4..20234c693c0 100644 --- a/pkg/apis/eventing/v1alpha1/eventtype_validation.go +++ b/pkg/apis/eventing/v1alpha1/eventtype_validation.go @@ -19,8 +19,6 @@ package v1alpha1 import ( "context" - "github.com/google/go-cmp/cmp/cmpopts" - "knative.dev/pkg/apis" "knative.dev/pkg/kmp" ) @@ -35,15 +33,6 @@ func (ets *EventTypeSpec) Validate(ctx context.Context) *apis.FieldError { fe := apis.ErrMissingField("type") errs = errs.Also(fe) } - if ets.Source == "" { - // TODO validate is a valid URI. - fe := apis.ErrMissingField("source") - errs = errs.Also(fe) - } - if ets.Broker == "" { - fe := apis.ErrMissingField("broker") - errs = errs.Also(fe) - } // TODO validate Schema is a valid URI. return errs } @@ -53,9 +42,8 @@ func (et *EventType) CheckImmutableFields(ctx context.Context, original *EventTy return nil } - // All but Description field immutable. - ignoreArguments := cmpopts.IgnoreFields(EventTypeSpec{}, "Description") - if diff, err := kmp.ShortDiff(original.Spec, et.Spec, ignoreArguments); err != nil { + // All fields immutable + if diff, err := kmp.ShortDiff(original.Spec, et.Spec); err != nil { return &apis.FieldError{ Message: "Failed to diff EventType", Paths: []string{"spec"}, diff --git a/pkg/apis/eventing/v1alpha1/eventtype_validation_test.go b/pkg/apis/eventing/v1alpha1/eventtype_validation_test.go index 156131e9901..ad1b37121e2 100644 --- a/pkg/apis/eventing/v1alpha1/eventtype_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/eventtype_validation_test.go @@ -25,16 +25,16 @@ import ( ) func TestEventTypeValidation(t *testing.T) { - name := "invalid type and source and broker" - broker := &EventType{Spec: EventTypeSpec{}} + name := "invalid type" + et := &EventType{Spec: EventTypeSpec{}} want := &apis.FieldError{ - Paths: []string{"spec.type", "spec.source", "spec.broker"}, + Paths: []string{"spec.type"}, Message: "missing field(s)", } t.Run(name, func(t *testing.T) { - got := broker.Validate(context.TODO()) + got := et.Validate(context.TODO()) if diff := cmp.Diff(want.Error(), got.Error()); diff != "" { t.Errorf("EventType.Validate (-want, +got) = %v", diff) } @@ -47,42 +47,12 @@ func TestEventTypeSpecValidation(t *testing.T) { ets *EventTypeSpec want *apis.FieldError }{{ - name: "invalid eventtype spec", - ets: &EventTypeSpec{}, - want: func() *apis.FieldError { - fe := apis.ErrMissingField("type", "source", "broker") - return fe - }(), - }, { name: "invalid eventtype type", - ets: &EventTypeSpec{ - Source: "test-source", - Broker: "test-broker", - }, + ets: &EventTypeSpec{}, want: func() *apis.FieldError { fe := apis.ErrMissingField("type") return fe }(), - }, { - name: "invalid eventtype source", - ets: &EventTypeSpec{ - Type: "test-type", - Broker: "test-broker", - }, - want: func() *apis.FieldError { - fe := apis.ErrMissingField("source") - return fe - }(), - }, { - name: "invalid eventtype broker", - ets: &EventTypeSpec{ - Type: "test-type", - Source: "test-source", - }, - want: func() *apis.FieldError { - fe := apis.ErrMissingField("broker") - return fe - }(), }, } @@ -232,7 +202,7 @@ func TestEventTypeImmutableFields(t *testing.T) { `, }, }, { - name: "good (description change)", + name: "bad (description change)", current: &EventType{ Spec: EventTypeSpec{ Type: "test-type", @@ -251,9 +221,15 @@ func TestEventTypeImmutableFields(t *testing.T) { Description: "original-description", }, }, - want: nil, - }, - } + want: &apis.FieldError{ + Message: "Immutable fields changed (-old +new)", + Paths: []string{"spec"}, + Details: `{v1alpha1.EventTypeSpec}.Description: + -: "original-description" + +: "test-description" +`, + }, + }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { diff --git a/pkg/apis/eventing/v1alpha1/trigger_lifecycle.go b/pkg/apis/eventing/v1alpha1/trigger_lifecycle.go index aa87b211e3c..7a198dc514b 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_lifecycle.go +++ b/pkg/apis/eventing/v1alpha1/trigger_lifecycle.go @@ -154,7 +154,7 @@ func (ts *TriggerStatus) MarkDependencyUnknown(reason, messageFormat string, mes } func (ts *TriggerStatus) MarkDependencyNotConfigured() { - triggerCondSet.Manage(ts).MarkUnknown(EventTypeConditionBrokerReady, + triggerCondSet.Manage(ts).MarkUnknown(TriggerConditionDependency, "DependencyNotConfigured", "Dependency has not yet been reconciled.") } diff --git a/pkg/apis/eventing/v1beta1/eventtype_defaults_test.go b/pkg/apis/eventing/v1beta1/eventtype_defaults_test.go index 0a5cc1dc5bc..3c29335b9d7 100644 --- a/pkg/apis/eventing/v1beta1/eventtype_defaults_test.go +++ b/pkg/apis/eventing/v1beta1/eventtype_defaults_test.go @@ -44,7 +44,7 @@ func TestEventTypeDefaults(t *testing.T) { initial: EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "", Schema: testSchema, }, @@ -52,7 +52,7 @@ func TestEventTypeDefaults(t *testing.T) { expected: EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "default", Schema: testSchema, }, @@ -62,14 +62,14 @@ func TestEventTypeDefaults(t *testing.T) { initial: EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Schema: testSchema, }, }, expected: EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "default", Schema: testSchema, }, diff --git a/pkg/apis/eventing/v1beta1/eventtype_types.go b/pkg/apis/eventing/v1beta1/eventtype_types.go index 8c1c3cc64a0..dbdf3f760ce 100644 --- a/pkg/apis/eventing/v1beta1/eventtype_types.go +++ b/pkg/apis/eventing/v1beta1/eventtype_types.go @@ -39,6 +39,7 @@ type EventType struct { // Status represents the current state of the EventType. // This data may be out of date. // +optional + // TODO might be removed https://github.com/knative/eventing/issues/2750 Status EventTypeStatus `json:"status,omitempty"` } @@ -60,7 +61,8 @@ type EventTypeSpec struct { // Type represents the CloudEvents type. It is authoritative. Type string `json:"type"` // Source is a URI, it represents the CloudEvents source. - Source apis.URL `json:"source"` + // +optional + Source *apis.URL `json:"source,omitempty"` // Schema is a URI, it represents the CloudEvents schemaurl extension attribute. // It may be a JSON schema, a protobuf schema, etc. It is optional. // +optional @@ -70,8 +72,10 @@ type EventTypeSpec struct { // The contents are not validated or manipulated by the system. // +optional SchemaData string `json:"schemaData,omitempty"` + // TODO remove https://github.com/knative/eventing/issues/2750 // Broker refers to the Broker that can provide the EventType. - Broker string `json:"broker"` + // +optional + Broker string `json:"broker,omitempty"` // Description is an optional field used to describe the EventType, in any meaningful way. // +optional Description string `json:"description,omitempty"` diff --git a/pkg/apis/eventing/v1beta1/eventtype_validation.go b/pkg/apis/eventing/v1beta1/eventtype_validation.go index 993924a4f77..ae5419f3657 100644 --- a/pkg/apis/eventing/v1beta1/eventtype_validation.go +++ b/pkg/apis/eventing/v1beta1/eventtype_validation.go @@ -19,8 +19,6 @@ package v1beta1 import ( "context" - "github.com/google/go-cmp/cmp/cmpopts" - "knative.dev/pkg/apis" "knative.dev/pkg/kmp" ) @@ -35,15 +33,7 @@ func (ets *EventTypeSpec) Validate(ctx context.Context) *apis.FieldError { fe := apis.ErrMissingField("type") errs = errs.Also(fe) } - if ets.Source.IsEmpty() { - // TODO validate is a valid URI. - fe := apis.ErrMissingField("source") - errs = errs.Also(fe) - } - if ets.Broker == "" { - fe := apis.ErrMissingField("broker") - errs = errs.Also(fe) - } + // TODO validate Source is a valid URI. // TODO validate Schema is a valid URI. // There is no validation of the SchemaData, it is application specific data. return errs @@ -54,9 +44,8 @@ func (et *EventType) CheckImmutableFields(ctx context.Context, original *EventTy return nil } - // All but Description field immutable. - ignoreArguments := cmpopts.IgnoreFields(EventTypeSpec{}, "Description") - if diff, err := kmp.ShortDiff(original.Spec, et.Spec, ignoreArguments); err != nil { + // All fields are immutable. + if diff, err := kmp.ShortDiff(original.Spec, et.Spec); err != nil { return &apis.FieldError{ Message: "Failed to diff EventType", Paths: []string{"spec"}, diff --git a/pkg/apis/eventing/v1beta1/eventtype_validation_test.go b/pkg/apis/eventing/v1beta1/eventtype_validation_test.go index e5d478b4aa4..ce01feaab9d 100644 --- a/pkg/apis/eventing/v1beta1/eventtype_validation_test.go +++ b/pkg/apis/eventing/v1beta1/eventtype_validation_test.go @@ -25,16 +25,16 @@ import ( ) func TestEventTypeValidation(t *testing.T) { - name := "invalid type and source and broker" - broker := &EventType{Spec: EventTypeSpec{}} + name := "invalid type" + et := &EventType{Spec: EventTypeSpec{}} want := &apis.FieldError{ - Paths: []string{"spec.type", "spec.source", "spec.broker"}, + Paths: []string{"spec.type"}, Message: "missing field(s)", } t.Run(name, func(t *testing.T) { - got := broker.Validate(context.TODO()) + got := et.Validate(context.TODO()) if diff := cmp.Diff(want.Error(), got.Error()); diff != "" { t.Errorf("EventType.Validate (-want, +got) = %v", diff) } @@ -48,42 +48,19 @@ func TestEventTypeSpecValidation(t *testing.T) { ets *EventTypeSpec want *apis.FieldError }{{ - name: "invalid eventtype spec", - ets: &EventTypeSpec{}, - want: func() *apis.FieldError { - fe := apis.ErrMissingField("type", "source", "broker") - return fe - }(), - }, { name: "invalid eventtype type", - ets: &EventTypeSpec{ - Source: *testSource, - Broker: "test-broker", - }, + ets: &EventTypeSpec{}, want: func() *apis.FieldError { fe := apis.ErrMissingField("type") return fe }(), }, { - name: "invalid eventtype source", + name: "valid eventtype", ets: &EventTypeSpec{ Type: "test-type", + Source: testSource, Broker: "test-broker", }, - want: func() *apis.FieldError { - fe := apis.ErrMissingField("source") - return fe - }(), - }, { - name: "invalid eventtype broker", - ets: &EventTypeSpec{ - Type: "test-type", - Source: *testSource, - }, - want: func() *apis.FieldError { - fe := apis.ErrMissingField("broker") - return fe - }(), }, } @@ -113,7 +90,7 @@ func TestEventTypeImmutableFields(t *testing.T) { current: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", Schema: testSchema, SchemaData: testSchemaData, @@ -122,7 +99,7 @@ func TestEventTypeImmutableFields(t *testing.T) { original: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", Schema: testSchema, SchemaData: testSchemaData, @@ -134,7 +111,7 @@ func TestEventTypeImmutableFields(t *testing.T) { current: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", Schema: testSchema, }, @@ -146,14 +123,14 @@ func TestEventTypeImmutableFields(t *testing.T) { current: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", }, }, original: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "original-broker", }, }, @@ -170,14 +147,14 @@ func TestEventTypeImmutableFields(t *testing.T) { current: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", }, }, original: &EventType{ Spec: EventTypeSpec{ Type: "original-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", }, }, @@ -194,14 +171,14 @@ func TestEventTypeImmutableFields(t *testing.T) { current: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", }, }, original: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *differentSource, + Source: differentSource, Broker: "test-broker", }, }, @@ -218,7 +195,7 @@ func TestEventTypeImmutableFields(t *testing.T) { current: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", Schema: testSchema, }, @@ -226,7 +203,7 @@ func TestEventTypeImmutableFields(t *testing.T) { original: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", Schema: differentSchema, }, @@ -240,11 +217,11 @@ func TestEventTypeImmutableFields(t *testing.T) { `, }, }, { - name: "good (description change)", + name: "bad (description change)", current: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", Schema: testSchema, Description: "test-description", @@ -253,13 +230,20 @@ func TestEventTypeImmutableFields(t *testing.T) { original: &EventType{ Spec: EventTypeSpec{ Type: "test-type", - Source: *testSource, + Source: testSource, Broker: "test-broker", Schema: testSchema, Description: "original-description", }, }, - want: nil, + want: &apis.FieldError{ + Message: "Immutable fields changed (-old +new)", + Paths: []string{"spec"}, + Details: `{v1beta1.EventTypeSpec}.Description: + -: "original-description" + +: "test-description" +`, + }, }, } diff --git a/pkg/apis/eventing/v1beta1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1beta1/zz_generated.deepcopy.go index e38b3d7fb94..38bdb083405 100644 --- a/pkg/apis/eventing/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1beta1/zz_generated.deepcopy.go @@ -196,7 +196,11 @@ func (in *EventTypeList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EventTypeSpec) DeepCopyInto(out *EventTypeSpec) { *out = *in - in.Source.DeepCopyInto(&out.Source) + if in.Source != nil { + in, out := &in.Source, &out.Source + *out = new(apis.URL) + (*in).DeepCopyInto(*out) + } if in.Schema != nil { in, out := &in.Schema, &out.Schema *out = new(apis.URL) diff --git a/pkg/apis/sources/register.go b/pkg/apis/sources/register.go index 73f43e9dc23..55899c3fd77 100644 --- a/pkg/apis/sources/register.go +++ b/pkg/apis/sources/register.go @@ -16,10 +16,22 @@ limitations under the License. package sources -import "k8s.io/apimachinery/pkg/runtime/schema" +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/pkg/apis/duck" +) const ( GroupName = "sources.knative.dev" + + // SourceDuckAnnotationKey is the annotation key to indicate + // whether the CRD is a Source duck type. + // Valid values: "true" or "false" + SourceDuckAnnotationKey = duck.GroupName + "/source" + + // SourceDuckAnnotationValue is the annotation value to indicate + // the CRD is a Source duck type. + SourceDuckAnnotationValue = "true" ) var ( diff --git a/pkg/apis/sources/v1alpha1/apiserver_lifecycle.go b/pkg/apis/sources/v1alpha1/apiserver_lifecycle.go index 7e3240e8afc..72947aee088 100644 --- a/pkg/apis/sources/v1alpha1/apiserver_lifecycle.go +++ b/pkg/apis/sources/v1alpha1/apiserver_lifecycle.go @@ -37,9 +37,6 @@ const ( // ApiServerConditionSufficientPermissions has status True when the ApiServerSource has sufficient permissions to access resources. ApiServerConditionSufficientPermissions apis.ConditionType = "SufficientPermissions" - - // ApiServerConditionEventTypeProvided has status True when the ApiServerSource has been configured with its event types. - ApiServerConditionEventTypeProvided apis.ConditionType = "EventTypesProvided" ) var apiserverCondSet = apis.NewLivingConditionSet( @@ -121,16 +118,6 @@ func (s *ApiServerSourceStatus) PropagateDeploymentAvailability(d *appsv1.Deploy } } -// MarkEventTypes sets the condition that the source has set its event type. -func (s *ApiServerSourceStatus) MarkEventTypes() { - apiserverCondSet.Manage(s).MarkTrue(ApiServerConditionEventTypeProvided) -} - -// MarkNoEventTypes sets the condition that the source does not its event type configured. -func (s *ApiServerSourceStatus) MarkNoEventTypes(reason, messageFormat string, messageA ...interface{}) { - apiserverCondSet.Manage(s).MarkFalse(ApiServerConditionEventTypeProvided, reason, messageFormat, messageA...) -} - // MarkSufficientPermissions sets the condition that the source has enough permissions to access the resources. func (s *ApiServerSourceStatus) MarkSufficientPermissions() { apiserverCondSet.Manage(s).MarkTrue(ApiServerConditionSufficientPermissions) diff --git a/pkg/apis/sources/v1alpha1/apiserver_lifecycle_test.go b/pkg/apis/sources/v1alpha1/apiserver_lifecycle_test.go index 86b62aefa4d..5d39febcaae 100644 --- a/pkg/apis/sources/v1alpha1/apiserver_lifecycle_test.go +++ b/pkg/apis/sources/v1alpha1/apiserver_lifecycle_test.go @@ -98,15 +98,6 @@ func TestApiServerSourceStatusIsReady(t *testing.T) { return s }(), want: false, - }, { - name: "mark event types", - s: func() *ApiServerSourceStatus { - s := &ApiServerSourceStatus{} - s.InitializeConditions() - s.MarkEventTypes() - return s - }(), - want: false, }, { name: "mark sink and sufficient permissions and deployed", s: func() *ApiServerSourceStatus { @@ -118,18 +109,6 @@ func TestApiServerSourceStatusIsReady(t *testing.T) { return s }(), want: true, - }, { - name: "mark sink and sufficient permissions and deployed and event types", - s: func() *ApiServerSourceStatus { - s := &ApiServerSourceStatus{} - s.InitializeConditions() - s.MarkSink("uri://example") - s.MarkSufficientPermissions() - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkEventTypes() - return s - }(), - want: true, }, { name: "mark sink and not enough permissions", s: func() *ApiServerSourceStatus { @@ -216,40 +195,6 @@ func TestApiServerSourceStatusGetCondition(t *testing.T) { Type: ApiServerConditionReady, Status: corev1.ConditionTrue, }, - }, { - name: "mark sink and enough permissions and deployed and event types", - s: func() *ApiServerSourceStatus { - s := &ApiServerSourceStatus{} - s.InitializeConditions() - s.MarkSink("uri://example") - s.MarkSufficientPermissions() - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkEventTypes() - return s - }(), - condQuery: ApiServerConditionReady, - want: &apis.Condition{ - Type: ApiServerConditionReady, - Status: corev1.ConditionTrue, - }, - }, { - name: "mark sink empty and enough permissions and deployed and event types", - s: func() *ApiServerSourceStatus { - s := &ApiServerSourceStatus{} - s.InitializeConditions() - s.MarkSink("") - s.MarkSufficientPermissions() - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkEventTypes() - return s - }(), - condQuery: ApiServerConditionReady, - want: &apis.Condition{ - Type: ApiServerConditionReady, - Status: corev1.ConditionFalse, - Reason: "SinkEmpty", - Message: "Sink has resolved to empty.", - }, }} for _, test := range tests { diff --git a/pkg/apis/sources/v1alpha1/apiserver_types.go b/pkg/apis/sources/v1alpha1/apiserver_types.go index 39b01dffbc9..d1ed0731393 100644 --- a/pkg/apis/sources/v1alpha1/apiserver_types.go +++ b/pkg/apis/sources/v1alpha1/apiserver_types.go @@ -46,6 +46,16 @@ var ( _ apis.HasSpec = (*ApiServerSource)(nil) ) +// ApiServerSourceEventTypes is the list of CloudEvent types the ApiServerSource emits. +var ApiServerSourceEventTypes = []string{ + ApiServerSourceAddEventType, + ApiServerSourceDeleteEventType, + ApiServerSourceUpdateEventType, + ApiServerSourceAddRefEventType, + ApiServerSourceDeleteRefEventType, + ApiServerSourceUpdateRefEventType, +} + const ( // ApiServerSourceAddEventType is the ApiServerSource CloudEvent type for adds. ApiServerSourceAddEventType = "dev.knative.apiserver.resource.add" diff --git a/pkg/apis/sources/v1alpha1/ping_conversion.go b/pkg/apis/sources/v1alpha1/ping_conversion.go index 468c0d1b943..b1c7c5ae654 100644 --- a/pkg/apis/sources/v1alpha1/ping_conversion.go +++ b/pkg/apis/sources/v1alpha1/ping_conversion.go @@ -38,8 +38,9 @@ func (source *PingSource) ConvertTo(ctx context.Context, obj apis.Convertible) e } sink.Status = v1alpha2.PingSourceStatus{ SourceStatus: duckv1.SourceStatus{ - Status: source.Status.Status, - SinkURI: source.Status.SinkURI, + Status: source.Status.Status, + SinkURI: source.Status.SinkURI, + CloudEventAttributes: source.Status.CloudEventAttributes, }, } // Optionals @@ -71,8 +72,11 @@ func (sink *PingSource) ConvertFrom(ctx context.Context, obj apis.Convertible) e CloudEventOverrides: source.Spec.CloudEventOverrides, } sink.Status = PingSourceStatus{ - Status: source.Status.Status, - SinkURI: source.Status.SinkURI, + SourceStatus: duckv1.SourceStatus{ + Status: source.Status.Status, + SinkURI: source.Status.SinkURI, + CloudEventAttributes: source.Status.CloudEventAttributes, + }, } if reflect.DeepEqual(*sink.Spec.Sink, duckv1.Destination{}) { sink.Spec.Sink = nil diff --git a/pkg/apis/sources/v1alpha1/ping_conversion_test.go b/pkg/apis/sources/v1alpha1/ping_conversion_test.go index 356bb0253ec..7142779a5dd 100644 --- a/pkg/apis/sources/v1alpha1/ping_conversion_test.go +++ b/pkg/apis/sources/v1alpha1/ping_conversion_test.go @@ -82,12 +82,14 @@ func TestPingSourceConversionRoundTripUp(t *testing.T) { }, Spec: PingSourceSpec{}, Status: PingSourceStatus{ - Status: duckv1.Status{ - ObservedGeneration: 1, - Conditions: duckv1.Conditions{{ - Type: "Ready", - Status: "True", - }}, + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + }, }, }, }, @@ -102,12 +104,14 @@ func TestPingSourceConversionRoundTripUp(t *testing.T) { Sink: &sink, }, Status: PingSourceStatus{ - Status: duckv1.Status{ - ObservedGeneration: 1, - Conditions: duckv1.Conditions{{ - Type: "Ready", - Status: "Unknown", - }}, + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "Unknown", + }}, + }, }, }, }, @@ -122,14 +126,16 @@ func TestPingSourceConversionRoundTripUp(t *testing.T) { // TODO: full spec }, Status: PingSourceStatus{ - Status: duckv1.Status{ - ObservedGeneration: 1, - Conditions: duckv1.Conditions{{ - Type: "Ready", - Status: "True", - }}, + SourceStatus: duckv1.SourceStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + }, + SinkURI: sinkUri, }, - SinkURI: sinkUri, }, }, }} @@ -177,6 +183,11 @@ func TestPingSourceConversionRoundTripDown(t *testing.T) { }, } + ceAttributes := []duckv1.CloudEventAttributes{{ + Type: PingSourceEventType, + Source: PingSourceSource("ping-ns", "ping-name"), + }} + tests := []struct { name string in apis.Convertible @@ -239,7 +250,8 @@ func TestPingSourceConversionRoundTripDown(t *testing.T) { Status: "True", }}, }, - SinkURI: sinkURI, + SinkURI: sinkURI, + CloudEventAttributes: ceAttributes, }, }, }, diff --git a/pkg/apis/sources/v1alpha1/ping_lifecycle.go b/pkg/apis/sources/v1alpha1/ping_lifecycle.go index 9fb0b313b52..0258ced0562 100644 --- a/pkg/apis/sources/v1alpha1/ping_lifecycle.go +++ b/pkg/apis/sources/v1alpha1/ping_lifecycle.go @@ -39,9 +39,6 @@ const ( // PingSourceConditionDeployed has status True when the PingSource has had it's receive adapter deployment created. PingSourceConditionDeployed apis.ConditionType = "Deployed" - // PingSourceConditionEventTypeProvided has status True when the PingSource has been configured with its event type. - PingSourceConditionEventTypeProvided apis.ConditionType = "EventTypeProvided" - // PingSourceConditionResources is True when the resources listed for the PingSource have been properly // parsed and match specified syntax for resource quantities PingSourceConditionResources apis.ConditionType = "ResourcesCorrect" @@ -131,16 +128,6 @@ func (s *PingSourceStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) } } -// MarkEventType sets the condition that the source has set its event type. -func (s *PingSourceStatus) MarkEventType() { - PingSourceCondSet.Manage(s).MarkTrue(PingSourceConditionEventTypeProvided) -} - -// MarkNoEventType sets the condition that the source does not its event type configured. -func (s *PingSourceStatus) MarkNoEventType(reason, messageFormat string, messageA ...interface{}) { - PingSourceCondSet.Manage(s).MarkFalse(PingSourceConditionEventTypeProvided, reason, messageFormat, messageA...) -} - // MarkResourcesCorrect sets the condition that the source resources are properly parsable quantities func (s *PingSourceStatus) MarkResourcesCorrect() { PingSourceCondSet.Manage(s).MarkTrue(PingSourceConditionResources) diff --git a/pkg/apis/sources/v1alpha1/ping_lifecycle_test.go b/pkg/apis/sources/v1alpha1/ping_lifecycle_test.go index bb8be4d49ba..c65712fb26a 100644 --- a/pkg/apis/sources/v1alpha1/ping_lifecycle_test.go +++ b/pkg/apis/sources/v1alpha1/ping_lifecycle_test.go @@ -105,15 +105,6 @@ func TestPingSourceStatusIsReady(t *testing.T) { return s }(), want: false, - }, { - name: "mark event types", - s: func() *v1alpha1.PingSourceStatus { - s := &v1alpha1.PingSourceStatus{} - s.InitializeConditions() - s.MarkEventType() - return s - }(), - want: false, }, { name: "mark sink and deployed", s: func() *v1alpha1.PingSourceStatus { @@ -145,18 +136,6 @@ func TestPingSourceStatusIsReady(t *testing.T) { return s }(), want: true, - }, { - name: "mark schedule, sink, deployed, and event types", - s: func() *v1alpha1.PingSourceStatus { - s := &v1alpha1.PingSourceStatus{} - s.InitializeConditions() - s.MarkSchedule() - s.MarkSink(exampleUri) - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkEventType() - return s - }(), - want: true, }, { name: "mark schedule, sink and deployed then not deployed", s: func() *v1alpha1.PingSourceStatus { @@ -169,18 +148,6 @@ func TestPingSourceStatusIsReady(t *testing.T) { return s }(), want: false, - }, { - name: "mark schedule, sink, deployed and event types then no event types", - s: func() *v1alpha1.PingSourceStatus { - s := &v1alpha1.PingSourceStatus{} - s.InitializeConditions() - s.MarkSchedule() - s.MarkSink(exampleUri) - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkNoEventType("Testing", "") - return s - }(), - want: true, }, { name: "mark schedule validated, sink empty and deployed", s: func() *v1alpha1.PingSourceStatus { @@ -274,18 +241,6 @@ func TestPingSourceStatusGetTopLevelCondition(t *testing.T) { Type: v1alpha1.PingSourceConditionReady, Status: corev1.ConditionUnknown, }, - }, { - name: "mark event types", - s: func() *v1alpha1.PingSourceStatus { - s := &v1alpha1.PingSourceStatus{} - s.InitializeConditions() - s.MarkEventType() - return s - }(), - want: &apis.Condition{ - Type: v1alpha1.PingSourceConditionReady, - Status: corev1.ConditionUnknown, - }, }, { name: "mark sink and deployed", s: func() *v1alpha1.PingSourceStatus { @@ -326,21 +281,6 @@ func TestPingSourceStatusGetTopLevelCondition(t *testing.T) { Type: v1alpha1.PingSourceConditionReady, Status: corev1.ConditionTrue, }, - }, { - name: "mark schedule, sink, deployed, and event types", - s: func() *v1alpha1.PingSourceStatus { - s := &v1alpha1.PingSourceStatus{} - s.InitializeConditions() - s.MarkSchedule() - s.MarkSink(exampleUri) - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkEventType() - return s - }(), - want: &apis.Condition{ - Type: v1alpha1.PingSourceConditionReady, - Status: corev1.ConditionTrue, - }, }, { name: "mark schedule, sink and deployed then not deployed", s: func() *v1alpha1.PingSourceStatus { @@ -358,21 +298,6 @@ func TestPingSourceStatusGetTopLevelCondition(t *testing.T) { Status: corev1.ConditionFalse, Message: "The Deployment '' is unavailable.", }, - }, { - name: "mark schedule, sink, deployed and event types then no event types", - s: func() *v1alpha1.PingSourceStatus { - s := &v1alpha1.PingSourceStatus{} - s.InitializeConditions() - s.MarkSchedule() - s.MarkSink(exampleUri) - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkNoEventType("Testing", "") - return s - }(), - want: &apis.Condition{ - Type: v1alpha1.PingSourceConditionReady, - Status: corev1.ConditionTrue, - }, }, { name: "mark schedule validated, sink empty and deployed", s: func() *v1alpha1.PingSourceStatus { @@ -496,22 +421,6 @@ func TestPingSourceStatusGetCondition(t *testing.T) { Type: v1alpha1.PingSourceConditionReady, Status: corev1.ConditionTrue, }, - }, { - name: "mark schedule, sink, deployed, and event types", - s: func() *v1alpha1.PingSourceStatus { - s := &v1alpha1.PingSourceStatus{} - s.InitializeConditions() - s.MarkSchedule() - s.MarkSink(exampleUri) - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkEventType() - return s - }(), - condQuery: v1alpha1.PingSourceConditionReady, - want: &apis.Condition{ - Type: v1alpha1.PingSourceConditionReady, - Status: corev1.ConditionTrue, - }, }, { name: "mark schedule, sink and deployed then no sink", s: func() *v1alpha1.PingSourceStatus { @@ -584,23 +493,6 @@ func TestPingSourceStatusGetCondition(t *testing.T) { Reason: "DeploymentUnavailable", Message: "The Deployment '' is unavailable.", }, - }, { - name: "mark schedule, sink, deployed and event types, then no event types", - s: func() *v1alpha1.PingSourceStatus { - s := &v1alpha1.PingSourceStatus{} - s.InitializeConditions() - s.MarkSchedule() - s.MarkSink(exampleUri) - s.PropagateDeploymentAvailability(availableDeployment) - s.MarkEventType() - s.MarkNoEventType("Testing", "hi") - return s - }(), - condQuery: v1alpha1.PingSourceConditionReady, - want: &apis.Condition{ - Type: v1alpha1.PingSourceConditionReady, - Status: corev1.ConditionTrue, - }, }, { name: "mark schedule, sink empty and deployed", s: func() *v1alpha1.PingSourceStatus { diff --git a/pkg/apis/sources/v1alpha1/ping_types.go b/pkg/apis/sources/v1alpha1/ping_types.go index c500c305f53..578c792a839 100644 --- a/pkg/apis/sources/v1alpha1/ping_types.go +++ b/pkg/apis/sources/v1alpha1/ping_types.go @@ -96,14 +96,14 @@ type PingSourceSpec struct { // PingSourceStatus defines the observed state of PingSource. type PingSourceStatus struct { - // inherits duck/v1 Status, which currently provides: - // * ObservedGeneration - the 'Generation' of the Service that was last processed by the controller. - // * Conditions - the latest available observations of a resource's current state. - duckv1.Status `json:",inline"` - - // SinkURI is the current active sink URI that has been configured for the PingSource. - // +optional - SinkURI *apis.URL `json:"sinkUri,omitempty"` + // inherits duck/v1 SourceStatus, which currently provides: + // * ObservedGeneration - the 'Generation' of the Service that was last + // processed by the controller. + // * Conditions - the latest available observations of a resource's current + // state. + // * SinkURI - the current active sink URI that has been configured for the + // Source. + duckv1.SourceStatus `json:",inline"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go index 30c969b98e6..40338335045 100644 --- a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go @@ -24,7 +24,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" v1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2" - apis "knative.dev/pkg/apis" v1 "knative.dev/pkg/apis/duck/v1" v1beta1 "knative.dev/pkg/apis/duck/v1beta1" ) @@ -309,12 +308,7 @@ func (in *PingSourceSpec) DeepCopy() *PingSourceSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PingSourceStatus) DeepCopyInto(out *PingSourceStatus) { *out = *in - in.Status.DeepCopyInto(&out.Status) - if in.SinkURI != nil { - in, out := &in.SinkURI, &out.SinkURI - *out = new(apis.URL) - (*in).DeepCopyInto(*out) - } + in.SourceStatus.DeepCopyInto(&out.SourceStatus) return } diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index 738fcf89b25..2696ba02f75 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -29,17 +29,15 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" - eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/sources/v1alpha1" apiserversourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/apiserversource" - eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" listers "knative.dev/eventing/pkg/client/listers/sources/v1alpha1" "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/apiserversource/resources" + duckv1 "knative.dev/pkg/apis/duck/v1" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" pkgLogging "knative.dev/pkg/logging" "knative.dev/pkg/metrics" @@ -55,15 +53,6 @@ const ( component = "apiserversource" ) -var apiServerEventTypes = []string{ - v1alpha1.ApiServerSourceAddEventType, - v1alpha1.ApiServerSourceDeleteEventType, - v1alpha1.ApiServerSourceUpdateEventType, - v1alpha1.ApiServerSourceAddRefEventType, - v1alpha1.ApiServerSourceDeleteRefEventType, - v1alpha1.ApiServerSourceUpdateRefEventType, -} - // newReconciledNormal makes a new reconciler event with event type Normal, and // reason ApiServerSourceReconciled. func newReconciledNormal(namespace, name string) pkgreconciler.Event { @@ -83,7 +72,6 @@ type Reconciler struct { // listers index properties about resources apiserversourceLister listers.ApiServerSourceLister - eventTypeLister eventinglisters.EventTypeLister source string sinkResolver *resolver.URIResolver @@ -148,12 +136,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.ApiServ } source.Status.PropagateDeploymentAvailability(ra) - err = r.reconcileEventTypes(ctx, source) - if err != nil { - source.Status.MarkNoEventTypes("EventTypesReconcileFailed", "") - return fmt.Errorf("reconciling event types: %v", err) - } - source.Status.MarkEventTypes() + source.Status.CloudEventAttributes = r.createCloudEventAttributes() return newReconciledNormal(source.Namespace, source.Name) } @@ -225,118 +208,6 @@ func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1 return false } -func (r *Reconciler) reconcileEventTypes(ctx context.Context, src *v1alpha1.ApiServerSource) error { - current, err := r.getEventTypes(ctx, src) - if err != nil { - logging.FromContext(ctx).Error("Unable to get existing event types", zap.Error(err)) - return err - } - - expected, err := r.makeEventTypes(src) - if err != nil { - return err - } - - toCreate, toDelete := r.computeDiff(current, expected) - - for _, eventType := range toDelete { - if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(eventType.Name, &metav1.DeleteOptions{}); err != nil { - logging.FromContext(ctx).Error("Error deleting eventType", zap.Any("eventType", eventType)) - return err - } - } - - for _, eventType := range toCreate { - if _, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(&eventType); err != nil { - logging.FromContext(ctx).Error("Error creating eventType", zap.Any("eventType", eventType)) - return err - } - } - - return nil -} - -func (r *Reconciler) getEventTypes(ctx context.Context, src *v1alpha1.ApiServerSource) ([]eventingv1alpha1.EventType, error) { - etl, err := r.eventTypeLister.EventTypes(src.Namespace).List(r.getLabelSelector(src)) - if err != nil { - logging.FromContext(ctx).Error("Unable to list event types: %v", zap.Error(err)) - return nil, err - } - eventTypes := make([]eventingv1alpha1.EventType, 0) - for _, et := range etl { - if metav1.IsControlledBy(et, src) { - eventTypes = append(eventTypes, *et) - } - } - return eventTypes, nil -} - -func (r *Reconciler) makeEventTypes(src *v1alpha1.ApiServerSource) ([]eventingv1alpha1.EventType, error) { - eventTypes := make([]eventingv1alpha1.EventType, 0) - - // Only create EventTypes for Broker sinks. - // We add this check here in case the APIServerSource was changed from Broker to non-Broker sink. - // If so, we need to delete the existing ones, thus we return empty expected. - if ref := src.Spec.Sink.GetRef(); ref == nil || ref.Kind != "Broker" { - return eventTypes, nil - } - - args := &resources.EventTypeArgs{ - Src: src, - Source: r.source, - } - for _, apiEventType := range apiServerEventTypes { - args.Type = apiEventType - eventType := resources.MakeEventType(args) - eventTypes = append(eventTypes, eventType) - } - return eventTypes, nil -} - -func (r *Reconciler) computeDiff(current []eventingv1alpha1.EventType, expected []eventingv1alpha1.EventType) ([]eventingv1alpha1.EventType, []eventingv1alpha1.EventType) { - toCreate := make([]eventingv1alpha1.EventType, 0) - toDelete := make([]eventingv1alpha1.EventType, 0) - currentMap := asMap(current, keyFromEventType) - expectedMap := asMap(expected, keyFromEventType) - - // Iterate over the slices instead of the maps for predictable UT expectations. - for _, e := range expected { - if c, ok := currentMap[keyFromEventType(&e)]; !ok { - toCreate = append(toCreate, e) - } else { - if !equality.Semantic.DeepEqual(e.Spec, c.Spec) { - toDelete = append(toDelete, c) - toCreate = append(toCreate, e) - } - } - } - // Need to check whether the current EventTypes are not in the expected map. If so, we have to delete them. - // This could happen if the ApiServerSource CO changes its broker. - for _, c := range current { - if _, ok := expectedMap[keyFromEventType(&c)]; !ok { - toDelete = append(toDelete, c) - } - } - return toCreate, toDelete -} - -func asMap(eventTypes []eventingv1alpha1.EventType, keyFunc func(*eventingv1alpha1.EventType) string) map[string]eventingv1alpha1.EventType { - eventTypesAsMap := make(map[string]eventingv1alpha1.EventType, 0) - for _, eventType := range eventTypes { - key := keyFunc(&eventType) - eventTypesAsMap[key] = eventType - } - return eventTypesAsMap -} - -func keyFromEventType(eventType *eventingv1alpha1.EventType) string { - return fmt.Sprintf("%s_%s_%s_%s", eventType.Spec.Type, eventType.Spec.Source, eventType.Spec.Schema, eventType.Spec.Broker) -} - -func (r *Reconciler) getLabelSelector(src *v1alpha1.ApiServerSource) labels.Selector { - return labels.SelectorFromSet(resources.Labels(src.Name)) -} - // TODO determine how to push the updated logging config to existing data plane Pods. func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { if cfg != nil { @@ -349,7 +220,7 @@ func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { return } r.loggingConfig = logcfg - logging.FromContext(r.loggingContext).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) + logging.FromContext(r.loggingContext).Debug("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) } // TODO determine how to push the updated metrics config to existing data plane Pods. @@ -363,7 +234,7 @@ func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { Component: component, ConfigMap: cfg.Data, } - logging.FromContext(r.loggingContext).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) + logging.FromContext(r.loggingContext).Debug("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) } func (r *Reconciler) runAccessCheck(src *v1alpha1.ApiServerSource) error { @@ -432,3 +303,14 @@ func (r *Reconciler) runAccessCheck(src *v1alpha1.ApiServerSource) error { return fmt.Errorf("Insufficient permission: user %s cannot %s", user, missing) } + +func (r *Reconciler) createCloudEventAttributes() []duckv1.CloudEventAttributes { + ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(v1alpha1.ApiServerSourceEventTypes)) + for _, apiServerSourceType := range v1alpha1.ApiServerSourceEventTypes { + ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{ + Type: apiServerSourceType, + Source: r.source, + }) + } + return ceAttributes +} diff --git a/pkg/reconciler/apiserversource/apiserversource_test.go b/pkg/reconciler/apiserversource/apiserversource_test.go index 3b80b383cf4..39fe92cb2ba 100644 --- a/pkg/reconciler/apiserversource/apiserversource_test.go +++ b/pkg/reconciler/apiserversource/apiserversource_test.go @@ -18,20 +18,16 @@ package apiserversource import ( "context" - "fmt" "testing" appsv1 "k8s.io/api/apps/v1" authorizationv1 "k8s.io/api/authorization/v1" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" - "knative.dev/eventing/pkg/apis/eventing/v1alpha1" sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/apiserversource" "knative.dev/eventing/pkg/reconciler" @@ -185,7 +181,7 @@ func TestReconcile(t *testing.T) { WithApiServerSourceDeployed, WithApiServerSourceSink(sinkURI), WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, + WithApiServerSourceEventTypes(source), WithApiServerSourceStatusObservedGeneration(generation), ), }}, @@ -236,7 +232,7 @@ func TestReconcile(t *testing.T) { WithApiServerSourceDeployed, WithApiServerSourceSink(sinkURI), WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, + WithApiServerSourceEventTypes(source), WithApiServerSourceStatusObservedGeneration(generation), ), }}, @@ -295,7 +291,7 @@ func TestReconcile(t *testing.T) { WithApiServerSourceDeployed, WithApiServerSourceSinkDepRef(sinkURI), WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, + WithApiServerSourceEventTypes(source), WithApiServerSourceStatusObservedGeneration(generation), ), }}, @@ -352,7 +348,7 @@ func TestReconcile(t *testing.T) { WithApiServerSourceDeployed, WithApiServerSourceSink(sinkTargetURI), WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, + WithApiServerSourceEventTypes(source), WithApiServerSourceStatusObservedGeneration(generation), ), }}, @@ -403,7 +399,7 @@ func TestReconcile(t *testing.T) { WithInitApiServerSourceConditions, WithApiServerSourceSink(sinkURI), WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, + WithApiServerSourceEventTypes(source), WithApiServerSourceDeploymentUnavailable, WithApiServerSourceStatusObservedGeneration(generation), ), @@ -461,7 +457,7 @@ func TestReconcile(t *testing.T) { WithApiServerSourceDeploymentUnavailable, WithApiServerSourceSink(sinkURI), WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, + WithApiServerSourceEventTypes(source), WithApiServerSourceStatusObservedGeneration(generation), ), }}, @@ -518,7 +514,7 @@ func TestReconcile(t *testing.T) { WithApiServerSourceDeploymentUnavailable, WithApiServerSourceSink(sinkURI), WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, + WithApiServerSourceEventTypes(source), WithApiServerSourceStatusObservedGeneration(generation), ), }}, @@ -532,61 +528,6 @@ func TestReconcile(t *testing.T) { }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. - }, { - Name: "valid with event types to delete", - Objects: []runtime.Object{ - NewApiServerSource(sourceName, testNS, - WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ - Resources: []sourcesv1alpha1.ApiServerResource{{ - APIVersion: "", - Kind: "Namespace", - }}, - Sink: &sinkDest, - }), - WithApiServerSourceUID(sourceUID), - WithApiServerSourceObjectMetaGeneration(generation), - ), - NewChannel(sinkName, testNS, - WithInitChannelConditions, - WithChannelAddress(sinkDNS), - ), - makeEventTypeWithName(sourcesv1alpha1.ApiServerSourceAddEventType, "name-1"), - makeAvailableReceiveAdapter(), - }, - Key: testNS + "/" + sourceName, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "ApiServerSourceReconciled", `ApiServerSource reconciled: "%s/%s"`, testNS, sourceName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewApiServerSource(sourceName, testNS, - WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ - Resources: []sourcesv1alpha1.ApiServerResource{{ - APIVersion: "", - Kind: "Namespace", - }}, - Sink: &sinkDest, - }), - WithApiServerSourceUID(sourceUID), - WithApiServerSourceObjectMetaGeneration(generation), - // Status Update: - WithInitApiServerSourceConditions, - WithApiServerSourceDeployed, - WithApiServerSourceSink(sinkURI), - WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, - WithApiServerSourceStatusObservedGeneration(generation), - ), - }}, - WantDeletes: []clientgotesting.DeleteActionImpl{ - {Name: "name-1"}, - }, - WantCreates: []runtime.Object{ - makeSubjectAccessReview("namespaces", "get", "default"), - makeSubjectAccessReview("namespaces", "list", "default"), - makeSubjectAccessReview("namespaces", "watch", "default"), - }, - WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, - SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { Name: "valid with broker sink", Objects: []runtime.Object{ @@ -627,140 +568,11 @@ func TestReconcile(t *testing.T) { WithApiServerSourceDeployed, WithApiServerSourceSink(sinkURI), WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, - WithApiServerSourceStatusObservedGeneration(generation), - ), - }}, - WantCreates: []runtime.Object{ - makeEventType(sourcesv1alpha1.ApiServerSourceAddEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceDeleteEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceUpdateEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceAddRefEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceDeleteRefEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceUpdateRefEventType), - makeSubjectAccessReview("namespaces", "get", "default"), - makeSubjectAccessReview("namespaces", "list", "default"), - makeSubjectAccessReview("namespaces", "watch", "default"), - }, - WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, - SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. - }, { - Name: "valid with broker sink and missing event types", - Objects: []runtime.Object{ - NewApiServerSource(sourceName, testNS, - WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ - Resources: []sourcesv1alpha1.ApiServerResource{{ - APIVersion: "", - Kind: "Namespace", - }}, - Sink: &brokerDest, - }), - WithApiServerSourceUID(sourceUID), - WithApiServerSourceObjectMetaGeneration(generation), - ), - NewBroker(sinkName, testNS, - WithInitBrokerConditions, - WithBrokerAddress(sinkDNS), - ), - makeEventTypeWithName(sourcesv1alpha1.ApiServerSourceAddEventType, "name-1"), - makeEventTypeWithName(sourcesv1alpha1.ApiServerSourceDeleteEventType, "name-2"), - makeEventTypeWithName(sourcesv1alpha1.ApiServerSourceUpdateEventType, "name-3"), - makeAvailableReceiveAdapter(), - }, - Key: testNS + "/" + sourceName, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "ApiServerSourceReconciled", `ApiServerSource reconciled: "%s/%s"`, testNS, sourceName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewApiServerSource(sourceName, testNS, - WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ - Resources: []sourcesv1alpha1.ApiServerResource{{ - APIVersion: "", - Kind: "Namespace", - }}, - Sink: &brokerDest, - }), - WithApiServerSourceUID(sourceUID), - WithApiServerSourceObjectMetaGeneration(generation), - // Status Update: - WithInitApiServerSourceConditions, - WithApiServerSourceDeployed, - WithApiServerSourceSink(sinkURI), - WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, + WithApiServerSourceEventTypes(source), WithApiServerSourceStatusObservedGeneration(generation), ), }}, WantCreates: []runtime.Object{ - makeEventType(sourcesv1alpha1.ApiServerSourceAddRefEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceDeleteRefEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceUpdateRefEventType), - makeSubjectAccessReview("namespaces", "get", "default"), - makeSubjectAccessReview("namespaces", "list", "default"), - makeSubjectAccessReview("namespaces", "watch", "default"), - }, - WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, - SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. - }, { - Name: "valid with broker sink and event types to delete", - Objects: []runtime.Object{ - NewApiServerSource(sourceName, testNS, - WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ - Resources: []sourcesv1alpha1.ApiServerResource{{ - APIVersion: "", - Kind: "Namespace", - }}, - Sink: &brokerDest, - }), - WithApiServerSourceUID(sourceUID), - WithApiServerSourceObjectMetaGeneration(generation), - ), - NewBroker(sinkName, testNS, - WithInitBrokerConditions, - WithBrokerAddress(sinkDNS), - ), - // https://github.com/knative/pkg/issues/411 - // Be careful adding more EventTypes here, the current unit test lister does not - // return items in a fixed order, so the EventTypes can come back in any order. - // WantDeletes requires the order to be correct, so will be flaky if we add more - // than one EventType here. - makeEventTypeWithName("type1", "name-1"), - makeAvailableReceiveAdapter(), - }, - Key: testNS + "/" + sourceName, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "ApiServerSourceReconciled", `ApiServerSource reconciled: "%s/%s"`, testNS, sourceName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewApiServerSource(sourceName, testNS, - WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ - Resources: []sourcesv1alpha1.ApiServerResource{{ - APIVersion: "", - Kind: "Namespace", - }}, - Sink: &brokerDest, - }), - WithApiServerSourceUID(sourceUID), - WithApiServerSourceObjectMetaGeneration(generation), - // Status Update: - WithInitApiServerSourceConditions, - WithApiServerSourceDeployed, - WithApiServerSourceSink(sinkURI), - WithApiServerSourceSufficientPermissions, - WithApiServerSourceEventTypes, - WithApiServerSourceStatusObservedGeneration(generation), - ), - }}, - WantDeletes: []clientgotesting.DeleteActionImpl{ - {Name: "name-1"}, - }, - WantCreates: []runtime.Object{ - makeEventType(sourcesv1alpha1.ApiServerSourceAddEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceDeleteEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceUpdateEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceAddRefEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceDeleteRefEventType), - makeEventType(sourcesv1alpha1.ApiServerSourceUpdateRefEventType), makeSubjectAccessReview("namespaces", "get", "default"), makeSubjectAccessReview("namespaces", "list", "default"), makeSubjectAccessReview("namespaces", "watch", "default"), @@ -775,7 +587,6 @@ func TestReconcile(t *testing.T) { r := &Reconciler{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), apiserversourceLister: listers.GetApiServerSourceLister(), - eventTypeLister: listers.GetEventTypeLister(), source: source, receiveAdapterImage: image, sinkResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), @@ -868,34 +679,6 @@ func makeReceiveAdapterWithDifferentContainerCount() *appsv1.Deployment { return ra } -func makeEventTypeWithName(eventType, name string) *v1alpha1.EventType { - et := makeEventType(eventType) - et.Name = name - return et -} - -func makeEventType(eventType string) *v1alpha1.EventType { - return &v1alpha1.EventType{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", eventType, sourceUID), - Labels: resources.Labels(sourceName), - Namespace: testNS, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(makeApiServerSource(), schema.GroupVersionKind{ - Group: sourcesv1alpha1.SchemeGroupVersion.Group, - Version: sourcesv1alpha1.SchemeGroupVersion.Version, - Kind: "ApiServerSource", - }), - }, - }, - Spec: v1alpha1.EventTypeSpec{ - Type: eventType, - Source: source, - Broker: sinkName, - }, - } -} - func makeSubjectAccessReview(resource, verb, sa string) *authorizationv1.SubjectAccessReview { return &authorizationv1.SubjectAccessReview{ Spec: authorizationv1.SubjectAccessReviewSpec{ diff --git a/pkg/reconciler/apiserversource/controller.go b/pkg/reconciler/apiserversource/controller.go index 2573e92e646..9eae3cb4a34 100644 --- a/pkg/reconciler/apiserversource/controller.go +++ b/pkg/reconciler/apiserversource/controller.go @@ -29,7 +29,6 @@ import ( "knative.dev/pkg/metrics" "knative.dev/pkg/resolver" - eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" apiserversourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/apiserversource" apiserversourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/apiserversource" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" @@ -57,12 +56,10 @@ func NewController( deploymentInformer := deploymentinformer.Get(ctx) apiServerSourceInformer := apiserversourceinformer.Get(ctx) - eventTypeInformer := eventtypeinformer.Get(ctx) r := &Reconciler{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), apiserversourceLister: apiServerSourceInformer.Lister(), - eventTypeLister: eventTypeInformer.Lister(), source: GetCfgHost(ctx), loggingContext: ctx, } @@ -85,11 +82,6 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) - eventTypeInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterGroupKind(v1alpha1.Kind("ApiServerSource")), - Handler: controller.HandleAll(impl.EnqueueControllerOf), - }) - cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap) cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap) diff --git a/pkg/reconciler/apiserversource/resources/eventtype.go b/pkg/reconciler/apiserversource/resources/eventtype.go deleted file mode 100644 index 0afe5d24cb0..00000000000 --- a/pkg/reconciler/apiserversource/resources/eventtype.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing/pkg/apis/sources/v1alpha1" - "knative.dev/eventing/pkg/utils" -) - -// EventTypeArgs are the arguments needed to create an EventType for an Api Server Source. -type EventTypeArgs struct { - Src *v1alpha1.ApiServerSource - Type string - Source string -} - -// MakeEventType creates the in-memory representation of the EventType for the specified ApiServerSource. -func MakeEventType(args *EventTypeArgs) eventingv1alpha1.EventType { - return eventingv1alpha1.EventType{ - ObjectMeta: metav1.ObjectMeta{ - Name: utils.GenerateFixedName(args.Src, utils.ToDNS1123Subdomain(args.Type)), - Labels: Labels(args.Src.Name), - Namespace: args.Src.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(args.Src, schema.GroupVersionKind{ - Group: v1alpha1.SchemeGroupVersion.Group, - Version: v1alpha1.SchemeGroupVersion.Version, - Kind: "ApiServerSource", - }), - }, - }, - Spec: eventingv1alpha1.EventTypeSpec{ - Type: args.Type, - Source: args.Source, - Broker: args.Src.Spec.Sink.GetRef().Name, - }, - } -} diff --git a/pkg/reconciler/eventtype/controller.go b/pkg/reconciler/eventtype/controller.go index 829c5cb79fd..8f7f2060764 100644 --- a/pkg/reconciler/eventtype/controller.go +++ b/pkg/reconciler/eventtype/controller.go @@ -41,6 +41,7 @@ const ( // NewController initializes the controller and is called by the generated code // Registers event handlers to enqueue events +// TODO remove https://github.com/knative/eventing/issues/2750 func NewController( ctx context.Context, cmw configmap.Watcher, diff --git a/pkg/reconciler/eventtype/eventtype.go b/pkg/reconciler/eventtype/eventtype.go index 8786faf09eb..e95ad0f564a 100644 --- a/pkg/reconciler/eventtype/eventtype.go +++ b/pkg/reconciler/eventtype/eventtype.go @@ -51,6 +51,7 @@ var _ eventtypereconciler.Interface = (*Reconciler)(nil) // ReconcileKind implements Interface.ReconcileKind. // 1. Verify the Broker exists. // 2. Verify the Broker is ready. +// TODO remove https://github.com/knative/eventing/issues/2750 func (r *Reconciler) ReconcileKind(ctx context.Context, et *v1alpha1.EventType) pkgreconciler.Event { et.Status.InitializeConditions() et.Status.ObservedGeneration = et.Generation diff --git a/pkg/reconciler/pingsource/controller/controller.go b/pkg/reconciler/pingsource/controller/controller.go index 22c851f3741..9d95c1a1668 100644 --- a/pkg/reconciler/pingsource/controller/controller.go +++ b/pkg/reconciler/pingsource/controller/controller.go @@ -34,7 +34,6 @@ import ( "knative.dev/pkg/system" "knative.dev/eventing/pkg/apis/sources/v1alpha1" - eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" pingsourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/pingsource" pingsourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/pingsource" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" @@ -65,13 +64,11 @@ func NewController( deploymentInformer := deploymentinformer.Get(ctx) pingSourceInformer := pingsourceinformer.Get(ctx) - eventTypeInformer := eventtypeinformer.Get(ctx) r := &Reconciler{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), pingLister: pingSourceInformer.Lister(), deploymentLister: deploymentInformer.Lister(), - eventTypeLister: eventTypeInformer.Lister(), loggingContext: ctx, } @@ -109,10 +106,6 @@ func NewController( )), }) - eventTypeInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterGroupKind(v1alpha1.Kind("PingSource")), - Handler: controller.HandleAll(impl.EnqueueControllerOf), - }) cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap) cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap) diff --git a/pkg/reconciler/pingsource/controller/pingsource.go b/pkg/reconciler/pingsource/controller/pingsource.go index ff338ac7783..43131533850 100644 --- a/pkg/reconciler/pingsource/controller/pingsource.go +++ b/pkg/reconciler/pingsource/controller/pingsource.go @@ -39,10 +39,8 @@ import ( "knative.dev/pkg/tracker" "knative.dev/eventing/pkg/apis/eventing" - eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/sources/v1alpha1" pingsourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/pingsource" - eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" listers "knative.dev/eventing/pkg/client/listers/sources/v1alpha1" "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler" @@ -81,7 +79,6 @@ type Reconciler struct { // listers index properties about resources pingLister listers.PingSourceLister deploymentLister appsv1listers.DeploymentLister - eventTypeLister eventinglisters.EventTypeLister // tracking jobrunner deployment changes tracker tracker.Interface @@ -164,12 +161,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.PingSou source.Status.PropagateDeploymentAvailability(ra) } - _, err = r.reconcileEventType(ctx, source) - if err != nil { - source.Status.MarkNoEventType("EventTypeReconcileFailed", "") - return fmt.Errorf("reconciling event types: %v", err) - } - source.Status.MarkEventType() + source.Status.CloudEventAttributes = []duckv1.CloudEventAttributes{{ + Type: v1alpha1.PingSourceEventType, + Source: v1alpha1.PingSourceSource(source.Namespace, source.Name), + }} return newReconciledNormal(source.Namespace, source.Name) } @@ -307,52 +302,6 @@ func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { return false } -func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.PingSource) (*eventingv1alpha1.EventType, error) { - sinkRef := src.Spec.Sink.GetRef() - if sinkRef == nil { - // Can't figure out the broker so return - return nil, nil - } - expected := resources.MakeEventType(src) - current, err := r.eventTypeLister.EventTypes(src.Namespace).Get(expected.Name) - if err != nil && !apierrors.IsNotFound(err) { - logging.FromContext(ctx).Error("Unable to get an existing event type", zap.Error(err)) - return nil, fmt.Errorf("getting event types: %v", err) - } - - // Only create EventTypes for Broker sinks. But if there is an EventType and the src has a non-Broker sink - // (possibly because it was updated), then we need to delete it. - if sinkRef.Kind != "Broker" { - if current != nil { - if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { - logging.FromContext(ctx).Error("Error deleting existing event type", zap.Error(err), zap.Any("eventType", current)) - return nil, fmt.Errorf("deleting event type: %v", err) - } - } - // No current and no error. - return nil, nil - } - - if current != nil { - if equality.Semantic.DeepEqual(expected.Spec, current.Spec) { - return current, nil - } - // EventTypes are immutable, delete it and create it again. - if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { - logging.FromContext(ctx).Error("Error deleting existing event type", zap.Error(err), zap.Any("eventType", current)) - return nil, fmt.Errorf("deleting event type: %v", err) - } - } - - current, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(expected) - if err != nil { - logging.FromContext(ctx).Error("Error creating event type", zap.Error(err), zap.Any("eventType", expected)) - return nil, fmt.Errorf("creating event type: %v", err) - } - logging.FromContext(ctx).Debug("EventType created", zap.Any("eventType", current)) - return current, nil -} - // TODO determine how to push the updated logging config to existing data plane Pods. func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { if cfg != nil { @@ -365,7 +314,7 @@ func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { return } r.loggingConfig = logcfg - logging.FromContext(r.loggingContext).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) + logging.FromContext(r.loggingContext).Debug("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) } // TODO determine how to push the updated metrics config to existing data plane Pods. @@ -379,5 +328,5 @@ func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { Component: component, ConfigMap: cfg.Data, } - logging.FromContext(r.loggingContext).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) + logging.FromContext(r.loggingContext).Debug("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) } diff --git a/pkg/reconciler/pingsource/controller/pingsource_test.go b/pkg/reconciler/pingsource/controller/pingsource_test.go index ea1d3521f2f..0f03545c348 100644 --- a/pkg/reconciler/pingsource/controller/pingsource_test.go +++ b/pkg/reconciler/pingsource/controller/pingsource_test.go @@ -244,117 +244,6 @@ func TestAllCases(t *testing.T) { WithPingSourceStatusObservedGeneration(generation), ), }}, - }, { - Name: "valid with event type creation", - Objects: []runtime.Object{ - NewPingSourceV1Alpha1(sourceName, testNS, - WithPingSourceSpec(sourcesv1alpha1.PingSourceSpec{ - Schedule: testSchedule, - Data: testData, - Sink: &brokerDest, - }), - WithPingSourceResourceScopeAnnotation, - WithPingSourceUID(sourceUID), - WithPingSourceObjectMetaGeneration(generation), - ), - NewBroker(sinkName, testNS, - WithInitBrokerConditions, - WithBrokerAddress(sinkDNS), - ), - makeAvailableReceiveAdapter(brokerDest), - }, - Key: testNS + "/" + sourceName, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "PingSourceReconciled", `PingSource reconciled: "%s/%s"`, testNS, sourceName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewPingSourceV1Alpha1(sourceName, testNS, - WithPingSourceSpec(sourcesv1alpha1.PingSourceSpec{ - Schedule: testSchedule, - Data: testData, - Sink: &brokerDest, - }), - WithPingSourceResourceScopeAnnotation, - WithPingSourceUID(sourceUID), - WithPingSourceObjectMetaGeneration(generation), - // Status Update: - WithInitPingSourceConditions, - WithValidPingSourceSchedule, - WithValidPingSourceResources, - WithPingSourceDeployed, - WithPingSourceEventType, - WithPingSourceSink(sinkURI), - WithPingSourceStatusObservedGeneration(generation), - ), - }}, - WantCreates: []runtime.Object{ - NewEventType(eventTypeName, testNS, - WithEventTypeLabels(resources.Labels(sourceName)), - WithEventTypeType(sourcesv1alpha1.PingSourceEventType), - WithEventTypeSource(sourcesv1alpha1.PingSourceSource(testNS, sourceName)), - WithEventTypeBroker(sinkName), - WithEventTypeOwnerReference(ownerRef)), - }, - }, { - Name: "valid with event type deletion and creation", - Objects: []runtime.Object{ - NewPingSourceV1Alpha1(sourceName, testNS, - WithPingSourceSpec(sourcesv1alpha1.PingSourceSpec{ - Schedule: testSchedule, - Data: testData, - Sink: &brokerDest, - }), - WithPingSourceResourceScopeAnnotation, - WithPingSourceUID(sourceUID), - WithPingSourceObjectMetaGeneration(generation), - ), - NewBroker(sinkName, testNS, - WithInitBrokerConditions, - WithBrokerAddress(sinkDNS), - ), - NewEventType(eventTypeName, testNS, - WithEventTypeLabels(resources.Labels(sourceName)), - WithEventTypeType("type-1"), - WithEventTypeSource(sourcesv1alpha1.PingSourceSource(testNS, sourceName)), - WithEventTypeBroker(sinkName), - WithEventTypeOwnerReference(ownerRef)), - makeAvailableReceiveAdapter(brokerDest), - }, - Key: testNS + "/" + sourceName, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "PingSourceReconciled", `PingSource reconciled: "%s/%s"`, testNS, sourceName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewPingSourceV1Alpha1(sourceName, testNS, - WithPingSourceSpec(sourcesv1alpha1.PingSourceSpec{ - Schedule: testSchedule, - Data: testData, - Sink: &brokerDest, - }), - WithPingSourceResourceScopeAnnotation, - WithPingSourceUID(sourceUID), - WithPingSourceObjectMetaGeneration(generation), - // Status Update: - WithInitPingSourceConditions, - WithValidPingSourceSchedule, - WithValidPingSourceResources, - WithPingSourceDeployed, - WithPingSourceEventType, - WithPingSourceSink(sinkURI), - WithPingSourceStatusObservedGeneration(generation), - ), - }}, - WantDeletes: []clientgotesting.DeleteActionImpl{{ - Name: eventTypeName, - }}, - WantCreates: []runtime.Object{ - NewEventType(eventTypeName, testNS, - WithEventTypeLabels(resources.Labels(sourceName)), - WithEventTypeType(sourcesv1alpha1.PingSourceEventType), - WithEventTypeSource(sourcesv1alpha1.PingSourceSource(testNS, sourceName)), - WithEventTypeBroker(sinkName), - WithEventTypeOwnerReference(ownerRef)), - }, }, { Name: "valid, existing ra", Objects: []runtime.Object{ @@ -447,65 +336,6 @@ func TestAllCases(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeNormal, "PingSourceReconciled", `PingSource reconciled: "%s/%s"`, testNS, sourceName), }, - }, { - Name: "valid with event type deletion", - Objects: []runtime.Object{ - NewPingSourceV1Alpha1(sourceName, testNS, - WithPingSourceSpec(sourcesv1alpha1.PingSourceSpec{ - Schedule: testSchedule, - Data: testData, - Sink: &sinkDest, - }), - WithPingSourceResourceScopeAnnotation, - WithPingSourceUID(sourceUID), - WithPingSourceObjectMetaGeneration(generation), - WithInitPingSourceConditions, - WithValidPingSourceSchedule, - WithValidPingSourceResources, - WithValidPingSourceResources, - WithPingSourceDeployed, - WithPingSourceSink(sinkURI), - WithPingSourceEventType, - ), - NewChannel(sinkName, testNS, - WithInitChannelConditions, - WithChannelAddress(sinkDNS), - ), - makeAvailableReceiveAdapter(sinkDest), - NewEventType(eventTypeName, testNS, - WithEventTypeLabels(resources.Labels(sourceName)), - WithEventTypeType(sourcesv1alpha1.PingSourceEventType), - WithEventTypeSource(sourcesv1alpha1.PingSourceSource(testNS, sourceName)), - WithEventTypeBroker(sinkName), - WithEventTypeOwnerReference(ownerRef)), - }, - Key: testNS + "/" + sourceName, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewPingSourceV1Alpha1(sourceName, testNS, - WithPingSourceSpec(sourcesv1alpha1.PingSourceSpec{ - Schedule: testSchedule, - Data: testData, - Sink: &sinkDest, - }), - WithPingSourceResourceScopeAnnotation, - WithPingSourceUID(sourceUID), - WithPingSourceObjectMetaGeneration(generation), - // Status Update: - WithInitPingSourceConditions, - WithValidPingSourceSchedule, - WithValidPingSourceResources, - WithPingSourceDeployed, - WithPingSourceSink(sinkURI), - WithPingSourceEventType, - WithPingSourceStatusObservedGeneration(generation), - ), - }}, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "PingSourceReconciled", `PingSource reconciled: "%s/%s"`, testNS, sourceName), - }, - WantDeletes: []clientgotesting.DeleteActionImpl{{ - Name: eventTypeName, - }}, }, { Name: "valid", Objects: []runtime.Object{ @@ -649,7 +479,6 @@ func TestAllCases(t *testing.T) { Base: reconciler.NewBase(ctx, controllerAgentName, cmw), pingLister: listers.GetPingSourceLister(), deploymentLister: listers.GetDeploymentLister(), - eventTypeLister: listers.GetEventTypeLister(), tracker: tracker.New(func(types.NamespacedName) {}, 0), receiveAdapterImage: image, jobRunnerImage: jobRunnerImage, diff --git a/pkg/reconciler/pingsource/controller/resources/eventtype.go b/pkg/reconciler/pingsource/controller/resources/eventtype.go deleted file mode 100644 index e9f04e16d38..00000000000 --- a/pkg/reconciler/pingsource/controller/resources/eventtype.go +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "knative.dev/pkg/kmeta" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing/pkg/apis/sources/v1alpha1" - "knative.dev/eventing/pkg/utils" -) - -// MakeEventType creates the in-memory representation of the EventType for the specified PingSource. -func MakeEventType(src *v1alpha1.PingSource) *eventingv1alpha1.EventType { - return &eventingv1alpha1.EventType{ - ObjectMeta: metav1.ObjectMeta{ - Name: utils.GenerateFixedName(src, utils.ToDNS1123Subdomain(v1alpha1.PingSourceEventType)), - Labels: Labels(src.Name), - Namespace: src.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(src), - }, - }, - Spec: eventingv1alpha1.EventTypeSpec{ - Type: v1alpha1.PingSourceEventType, - Source: v1alpha1.PingSourceSource(src.Namespace, src.Name), - Broker: src.Spec.Sink.GetRef().Name, - }, - } -} diff --git a/pkg/reconciler/pingsource/controller/resources/eventtype_test.go b/pkg/reconciler/pingsource/controller/resources/eventtype_test.go deleted file mode 100644 index 6050b735fcd..00000000000 --- a/pkg/reconciler/pingsource/controller/resources/eventtype_test.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing/pkg/apis/sources/v1alpha1" - "knative.dev/eventing/pkg/utils" - duckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/pkg/kmeta" - - eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" -) - -func TestMakeEventType(t *testing.T) { - src := &v1alpha1.PingSource{ - Spec: v1alpha1.PingSourceSpec{ - Schedule: "*/2 * * * *", - Sink: &duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: "v1alpha1", - Kind: "broker", - Name: "default", - Namespace: "namespace", - }, - }, - }, - } - - want := &eventingv1alpha1.EventType{ - ObjectMeta: metav1.ObjectMeta{ - Name: utils.GenerateFixedName(src, utils.ToDNS1123Subdomain(v1alpha1.PingSourceEventType)), - Labels: Labels(src.Name), - Namespace: src.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(src), - }, - }, - Spec: eventingv1alpha1.EventTypeSpec{ - Type: v1alpha1.PingSourceEventType, - Source: v1alpha1.PingSourceSource(src.Namespace, src.Name), - Broker: src.Spec.Sink.GetRef().Name, - }, - } - - got := MakeEventType(src) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected condition (-want, +got) = %v", diff) - } -} diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index a8cff223cfe..73fe757aa13 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -48,9 +47,6 @@ type Base struct { // EventingClientSet allows us to configure Eventing objects EventingClientSet clientset.Interface - // ApiExtensionsClientSet allows us to configure k8s API extension objects. - ApiExtensionsClientSet apiextensionsclientset.Interface - // DynamicClientSet allows us to configure pluggable Build objects DynamicClientSet dynamic.Interface diff --git a/pkg/reconciler/source/crd/controller.go b/pkg/reconciler/source/crd/controller.go new file mode 100644 index 00000000000..309e58858b4 --- /dev/null +++ b/pkg/reconciler/source/crd/controller.go @@ -0,0 +1,63 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package crd + +import ( + "context" + + "knative.dev/eventing/pkg/apis/sources" + "knative.dev/eventing/pkg/reconciler" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + pkgreconciler "knative.dev/pkg/reconciler" + + "k8s.io/client-go/tools/cache" + crdinfomer "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1beta1/customresourcedefinition" +) + +const ( + // ReconcilerName is the name of the reconciler. + ReconcilerName = "SourceCRDs" + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "source-crds-controller" +) + +// NewController creates a Reconciler for Sources' CRDs and returns the result of NewImpl. +func NewController( + ctx context.Context, + cmw configmap.Watcher, +) *controller.Impl { + + crdInformer := crdinfomer.Get(ctx) + + r := &Reconciler{ + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + crdLister: crdInformer.Lister(), + ogctx: ctx, + ogcmw: cmw, + } + impl := controller.NewImpl(r, r.Logger, ReconcilerName) + + r.Logger.Info("Setting up event handlers") + crdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: pkgreconciler.LabelFilterFunc(sources.SourceDuckAnnotationKey, sources.SourceDuckAnnotationValue, false), + Handler: controller.HandleAll(impl.Enqueue), + }) + + return impl +} diff --git a/pkg/reconciler/source/crd/crd.go b/pkg/reconciler/source/crd/crd.go new file mode 100644 index 00000000000..ee1c09fbce8 --- /dev/null +++ b/pkg/reconciler/source/crd/crd.go @@ -0,0 +1,215 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package crd + +import ( + "context" + "fmt" + "sync" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/logging" + "knative.dev/eventing/pkg/reconciler/source/duck" + "knative.dev/pkg/configmap" + + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/pkg/controller" + + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1" + "knative.dev/eventing/pkg/reconciler" +) + +const ( + // Name of the corev1.Events emitted from the Source CRDs reconciliation process. + sourceCRDReconcileFailed = "SourceCRDReconcileFailed" +) + +type runningController struct { + controller *controller.Impl + cancel context.CancelFunc +} + +// Reconciler implements controller.Reconciler for Source CRDs resources. +type Reconciler struct { + *reconciler.Base + + // Listers index properties about resources + crdLister apiextensionsv1beta1.CustomResourceDefinitionLister + + ogctx context.Context + ogcmw configmap.Watcher + + // controllers keeps a map for GVR to dynamically created controllers. + controllers map[schema.GroupVersionResource]runningController + + // Synchronization primitives + lock sync.RWMutex + onlyOnce sync.Once +} + +// Check that our Reconciler implements controller.Reconciler +var _ controller.Reconciler = (*Reconciler)(nil) + +func (r *Reconciler) Reconcile(ctx context.Context, key string) error { + + // Create controllers map only once. + r.onlyOnce.Do(func() { + r.controllers = make(map[schema.GroupVersionResource]runningController) + }) + + // Convert the namespace/name string into a distinct namespace and name. + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + logging.FromContext(ctx).Error("invalid resource key") + return nil + } + + // Get the CRD resource with this name. + original, err := r.crdLister.Get(name) + if apierrs.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logging.FromContext(ctx).Error("CRD key in work queue no longer exists") + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy. + crd := original.DeepCopy() + + reconcileErr := r.reconcile(ctx, crd) + if reconcileErr != nil { + r.Recorder.Eventf(crd, corev1.EventTypeWarning, sourceCRDReconcileFailed, "Source CRD reconciliation failed: %v", reconcileErr) + } + // Requeue if the reconcile failed. + return reconcileErr +} + +func (r *Reconciler) reconcile(ctx context.Context, crd *v1beta1.CustomResourceDefinition) error { + // The reconciliation process is as follows: + // 1. Resolve GVR and GVK from a particular Source CRD (i.e., those labeled with duck.knative.dev/source = "true") + // 2. Dynamically create a controller for it, if not present already. Such controller is in charge of reconciling + // duckv1.Source resources with that particular GVR.. + + gvr, gvk, err := r.resolveGroupVersions(ctx, crd) + if err != nil { + logging.FromContext(ctx).Error("Error while resolving GVR and GVK", zap.String("CRD", crd.Name), zap.Error(err)) + return err + } + + if !crd.DeletionTimestamp.IsZero() { + // We are intentionally not setting up a finalizer on the CRD. + // This might leave unnecessary dynamic controllers running. + // This is a best effort to try to clean them up. + // Note that without a finalizer there is no guarantee we will be called. + r.deleteController(ctx, gvr) + return nil + } + + err = r.reconcileController(ctx, crd, gvr, gvk) + if err != nil { + logging.FromContext(ctx).Error("Error while reconciling controller", zap.String("GVR", gvr.String()), zap.String("GVK", gvk.String()), zap.Error(err)) + return err + } + + return nil +} + +func (r *Reconciler) resolveGroupVersions(ctx context.Context, crd *v1beta1.CustomResourceDefinition) (*schema.GroupVersionResource, *schema.GroupVersionKind, error) { + var gvr *schema.GroupVersionResource + var gvk *schema.GroupVersionKind + for _, v := range crd.Spec.Versions { + if !v.Served { + continue + } + gvr = &schema.GroupVersionResource{ + Group: crd.Spec.Group, + Version: v.Name, + Resource: crd.Spec.Names.Plural, + } + + gvk = &schema.GroupVersionKind{ + Group: crd.Spec.Group, + Version: v.Name, + Kind: crd.Spec.Names.Kind, + } + + } + if gvr == nil || gvk == nil { + return nil, nil, fmt.Errorf("unable to find GVR or GVK for %s", crd.Name) + } + return gvr, gvk, nil +} + +func (r *Reconciler) deleteController(ctx context.Context, gvr *schema.GroupVersionResource) { + r.lock.RLock() + rc, found := r.controllers[*gvr] + r.lock.RUnlock() + if found { + r.lock.Lock() + // Now that we grabbed the write lock, check that nobody deleted it already. + rc, found = r.controllers[*gvr] + if found { + logging.FromContext(ctx).Info("Stopping Source Duck Controller", zap.String("GVR", gvr.String())) + rc.cancel() + delete(r.controllers, *gvr) + } + r.lock.Unlock() + } +} + +func (r *Reconciler) reconcileController(ctx context.Context, crd *v1beta1.CustomResourceDefinition, gvr *schema.GroupVersionResource, gvk *schema.GroupVersionKind) error { + r.lock.RLock() + rc, found := r.controllers[*gvr] + r.lock.RUnlock() + if found { + return nil + } + + r.lock.Lock() + defer r.lock.Unlock() + // Now that we grabbed the write lock, check that nobody has created the controller. + rc, found = r.controllers[*gvr] + if found { + return nil + } + + // Source Duck controller constructor + sdc := duck.NewController(crd.Name, *gvr, *gvk) + // Source Duck controller context + sdctx, cancel := context.WithCancel(r.ogctx) + // Source Duck controller instantiation + sd := sdc(sdctx, r.ogcmw) + + rc = runningController{ + controller: sd, + cancel: cancel, + } + r.controllers[*gvr] = rc + + logging.FromContext(ctx).Info("Starting Source Duck Controller", zap.String("GVR", gvr.String()), zap.String("GVK", gvk.String())) + go func(c *controller.Impl) { + if err := c.Run(controller.DefaultThreadsPerController, sdctx.Done()); err != nil { + logging.FromContext(ctx).Error("Unable to start Source Duck Controller", zap.String("GVR", gvr.String()), zap.String("GVK", gvk.String())) + } + }(rc.controller) + return nil +} diff --git a/pkg/reconciler/source/duck/controller.go b/pkg/reconciler/source/duck/controller.go new file mode 100644 index 00000000000..09266d9cd5a --- /dev/null +++ b/pkg/reconciler/source/duck/controller.go @@ -0,0 +1,91 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package duck + +import ( + "context" + + "go.uber.org/zap" + "k8s.io/client-go/tools/cache" + + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/eventing/pkg/reconciler" + "knative.dev/pkg/apis/duck" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/injection" + "knative.dev/pkg/injection/clients/dynamicclient" + "knative.dev/pkg/logging" + + eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" + duckv1 "knative.dev/pkg/apis/duck/v1" + crdinfomer "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1beta1/customresourcedefinition" +) + +const ( + // ReconcilerName is the name of the reconciler. + ReconcilerName = "SourceDucks" + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "source-duck-controller" +) + +// NewController returns a function that initializes the controller and +// Registers event handlers to enqueue events +func NewController(crd string, gvr schema.GroupVersionResource, gvk schema.GroupVersionKind) injection.ControllerConstructor { + return func(ctx context.Context, + cmw configmap.Watcher, + ) *controller.Impl { + + eventTypeInformer := eventtypeinformer.Get(ctx) + crdInformer := crdinfomer.Get(ctx) + + // Create a duck TypedInformer for duckv1.Source resources. + sourceinformer := &duck.TypedInformerFactory{ + Client: dynamicclient.Get(ctx), + Type: &duckv1.Source{}, + ResyncPeriod: controller.DefaultResyncPeriod, + StopChannel: ctx.Done(), + } + + sourceInformer, sourceLister, err := sourceinformer.Get(gvr) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Error getting source informer", zap.String("GVR", gvr.String()), zap.Error(err)) + return nil + } + + r := &Reconciler{ + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + eventTypeLister: eventTypeInformer.Lister(), + crdLister: crdInformer.Lister(), + sourceLister: sourceLister, + gvr: gvr, + crdName: crd, + } + impl := controller.NewImpl(r, r.Logger, ReconcilerName) + + r.Logger.Info("Setting up event handlers") + sourceInformer.AddEventHandler(controller.HandleAll(impl.Enqueue)) + + eventTypeInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterGroupVersionKind(gvk), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + + return impl + } +} diff --git a/pkg/reconciler/source/duck/duck.go b/pkg/reconciler/source/duck/duck.go new file mode 100644 index 00000000000..a5f81351cab --- /dev/null +++ b/pkg/reconciler/source/duck/duck.go @@ -0,0 +1,257 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package duck + +import ( + "context" + "encoding/json" + "fmt" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/labels" + "knative.dev/eventing/pkg/apis/eventing" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/reconciler/source/duck/resources" + + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/reconciler" + + apierrs "k8s.io/apimachinery/pkg/api/errors" + listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" + "knative.dev/eventing/pkg/logging" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +type Reconciler struct { + *reconciler.Base + + // listers index properties about resources + eventTypeLister listers.EventTypeLister + crdLister apiextensionsv1beta1.CustomResourceDefinitionLister + sourceLister cache.GenericLister + + gvr schema.GroupVersionResource + crdName string +} + +// eventTypeEntry refers to an entry in the registry.knative.dev/eventTypes annotation. +type eventTypeEntry struct { + Type string `json:"type"` + Schema string `json:"schema,omitempty"` + Description string `json:"description,omitempty"` +} + +func (r *Reconciler) Reconcile(ctx context.Context, key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + logging.FromContext(ctx).Error("invalid resource key") + return nil + } + + // Get the Source resource with this namespace/name + runtimeObj, err := r.sourceLister.ByNamespace(namespace).Get(name) + + var ok bool + var original *duckv1.Source + if original, ok = runtimeObj.(*duckv1.Source); !ok { + logging.FromContext(ctx).Error("runtime object is not convertible to Source duck type") + // Avoid re-enqueuing. + return nil + } + + if apierrs.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logging.FromContext(ctx).Error("Source in work queue no longer exists") + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy + orig := original.DeepCopy() + // Reconcile this copy of the Source. We do not control the Source, so do not update status. + return r.reconcile(ctx, orig) +} + +func (r *Reconciler) reconcile(ctx context.Context, source *duckv1.Source) error { + // Reconcile the eventTypes for this source. + err := r.reconcileEventTypes(ctx, source) + if err != nil { + logging.FromContext(ctx).Error("Error reconciling event types for Source") + return err + } + return nil +} + +// TODO revisit most of this logic once we get rid of Broker and maybe some other bits. +// https://github.com/knative/eventing/issues/2750. +func (r *Reconciler) reconcileEventTypes(ctx context.Context, src *duckv1.Source) error { + current, err := r.getEventTypes(ctx, src) + if err != nil { + logging.FromContext(ctx).Error("Unable to get existing event types", zap.Error(err)) + return err + } + + expected, err := r.makeEventTypes(ctx, src) + if err != nil { + return err + } + + toCreate, toDelete := r.computeDiff(current, expected) + + for _, eventType := range toDelete { + if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(eventType.Name, &metav1.DeleteOptions{}); err != nil { + logging.FromContext(ctx).Error("Error deleting eventType", zap.Any("eventType", eventType)) + return err + } + } + + for _, eventType := range toCreate { + if _, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(&eventType); err != nil { + logging.FromContext(ctx).Error("Error creating eventType", zap.Any("eventType", eventType)) + return err + } + } + + return nil +} + +func (r *Reconciler) getEventTypes(ctx context.Context, src *duckv1.Source) ([]v1alpha1.EventType, error) { + etl, err := r.eventTypeLister.EventTypes(src.Namespace).List(labels.SelectorFromSet(resources.Labels(src.Name))) + if err != nil { + logging.FromContext(ctx).Error("Unable to list event types: %v", zap.Error(err)) + return nil, err + } + eventTypes := make([]v1alpha1.EventType, 0) + for _, et := range etl { + if metav1.IsControlledBy(et, src) { + eventTypes = append(eventTypes, *et) + } + } + return eventTypes, nil +} + +func (r *Reconciler) makeEventTypes(ctx context.Context, src *duckv1.Source) ([]v1alpha1.EventType, error) { + // Only create EventTypes for Broker sinks. + // We add this check here in case the Source was changed from Broker to non-Broker sink. + // If so, we need to delete the existing ones, thus we return empty expected. + // TODO remove broker from EventType https://github.com/knative/eventing/issues/2750 + if ref := src.Spec.Sink.GetRef(); ref == nil || ref.Kind != "Broker" { + return make([]v1alpha1.EventType, 0), nil + } + + // If the Source didn't specify a CloudEventsAttributes, then we skip the creation of EventTypes. + // TODO might change in the near future https://github.com/knative/eventing/issues/2750. + if src.Status.CloudEventAttributes == nil { + return make([]v1alpha1.EventType, 0), nil + } + + entries := make(map[string]eventTypeEntry, 0) + // Get the description and schema from the CRD, in case they are there. + // The CRD annotation has the types as well. But given that different Sources have their own configurations, I'm just + // grabbing the description and schema from the CRD, using the type as "primary key". + // By having their own configs I mean, for example, in the GithubSource + // you can specify the subset of event types you are interested in, or in the PingSource you just have + // one type, and so on. + crd, err := r.crdLister.Get(r.crdName) + if err != nil { + // Only log, can create the EventType(s) without this info. + logging.FromContext(ctx).Error("Error getting CRD for Source", zap.Any("src", src)) + } else { + var ets []eventTypeEntry + if v, ok := crd.Annotations[eventing.EventTypesAnnotationKey]; ok { + if err := json.Unmarshal([]byte(v), &ets); err != nil { + // Same here, only log, can create the EventType(s) without this info. + logging.FromContext(ctx).Error("Error unmarshalling EventTypes", zap.String("annotation", eventing.EventTypesAnnotationKey), zap.Error(err)) + } + } + if ets != nil { + for _, et := range ets { + entries[et.Type] = et + } + } + } + + eventTypes := make([]v1alpha1.EventType, 0) + for _, attrib := range src.Status.CloudEventAttributes { + if attrib.Type == "" { + // Cannot have empty spec.type + continue + } + var schema, description string + if v, ok := entries[attrib.Type]; ok { + schema = v.Schema + description = v.Description + } + eventType := resources.MakeEventType(&resources.EventTypeArgs{ + Source: src, + CeType: attrib.Type, + CeSource: attrib.Source, + CeSchema: schema, + Description: description, + }) + eventTypes = append(eventTypes, *eventType) + } + return eventTypes, nil +} + +func (r *Reconciler) computeDiff(current []v1alpha1.EventType, expected []v1alpha1.EventType) ([]v1alpha1.EventType, []v1alpha1.EventType) { + toCreate := make([]v1alpha1.EventType, 0) + toDelete := make([]v1alpha1.EventType, 0) + currentMap := asMap(current, keyFromEventType) + expectedMap := asMap(expected, keyFromEventType) + + // Iterate over the slices instead of the maps for predictable UT expectations. + for _, e := range expected { + if c, ok := currentMap[keyFromEventType(&e)]; !ok { + toCreate = append(toCreate, e) + } else { + if !equality.Semantic.DeepEqual(e.Spec, c.Spec) { + toDelete = append(toDelete, c) + toCreate = append(toCreate, e) + } + } + } + // Need to check whether the current EventTypes are not in the expected map. If so, we have to delete them. + // This could happen if the Source CO changes its broker. + // TODO remove once we remove Broker https://github.com/knative/eventing/issues/2750 + for _, c := range current { + if _, ok := expectedMap[keyFromEventType(&c)]; !ok { + toDelete = append(toDelete, c) + } + } + return toCreate, toDelete +} + +func asMap(eventTypes []v1alpha1.EventType, keyFunc func(*v1alpha1.EventType) string) map[string]v1alpha1.EventType { + eventTypesAsMap := make(map[string]v1alpha1.EventType, 0) + for _, eventType := range eventTypes { + key := keyFunc(&eventType) + eventTypesAsMap[key] = eventType + } + return eventTypesAsMap +} + +// TODO we should probably use the hash of this instead. Will be revisited together with https://github.com/knative/eventing/issues/2750. +func keyFromEventType(eventType *v1alpha1.EventType) string { + return fmt.Sprintf("%s_%s_%s_%s", eventType.Spec.Type, eventType.Spec.Source, eventType.Spec.Schema, eventType.Spec.Broker) +} diff --git a/pkg/reconciler/source/duck/resources/eventtype.go b/pkg/reconciler/source/duck/resources/eventtype.go new file mode 100644 index 00000000000..4dfdbfc0d9e --- /dev/null +++ b/pkg/reconciler/source/duck/resources/eventtype.go @@ -0,0 +1,69 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "crypto/md5" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/ptr" +) + +// EventTypeArgs are the arguments needed to create an EventType for a Source. +type EventTypeArgs struct { + Source *duckv1.Source + CeType string + CeSource string + CeSchema string + Description string +} + +func MakeEventType(args *EventTypeArgs) *v1alpha1.EventType { + // Name it with the hash of the concatenation of the three fields. + // Cannot generate a fixed name based on type+UUID, because long type names might be cut, and we end up trying to create + // event types with the same name. + // TODO revisit whether we want multiple event types, or just one with multiple owner refs. That will depend on the fields + // it will contain. For example, if we remove Broker and Source, then the latter makes more sense. + // See https://github.com/knative/eventing/issues/2750 + fixedName := fmt.Sprintf("%x", md5.Sum([]byte(args.CeType+args.CeSource+args.CeSchema+string(args.Source.GetUID())))) + return &v1alpha1.EventType{ + ObjectMeta: metav1.ObjectMeta{ + Name: fixedName, + Labels: Labels(args.Source.Name), + Namespace: args.Source.Namespace, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: args.Source.APIVersion, + Kind: args.Source.Kind, + Name: args.Source.Name, + UID: args.Source.UID, + BlockOwnerDeletion: ptr.Bool(true), + Controller: ptr.Bool(true), + }}, + }, + Spec: v1alpha1.EventTypeSpec{ + Type: args.CeType, + Source: args.CeSource, + // TODO remove broker https://github.com/knative/eventing/issues/2750 + Broker: args.Source.Spec.Sink.GetRef().Name, + Description: args.Description, + Schema: args.CeSchema, + }, + } +} diff --git a/pkg/reconciler/source/duck/resources/labels.go b/pkg/reconciler/source/duck/resources/labels.go new file mode 100644 index 00000000000..bb6366cbb12 --- /dev/null +++ b/pkg/reconciler/source/duck/resources/labels.go @@ -0,0 +1,23 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +func Labels(name string) map[string]string { + return map[string]string{ + "eventing.knative.dev/sourceName": name, + } +} diff --git a/pkg/reconciler/testing/apiserversource.go b/pkg/reconciler/testing/apiserversource.go index 5600b6ec33c..da48784f9d1 100644 --- a/pkg/reconciler/testing/apiserversource.go +++ b/pkg/reconciler/testing/apiserversource.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/types" "knative.dev/eventing/pkg/apis/sources/v1alpha1" "knative.dev/eventing/pkg/utils" + duckv1 "knative.dev/pkg/apis/duck/v1" ) // ApiServerSourceOption enables further configuration of a ApiServer. @@ -81,8 +82,17 @@ func WithApiServerSourceDeployed(s *v1alpha1.ApiServerSource) { s.Status.PropagateDeploymentAvailability(NewDeployment("any", "any", WithDeploymentAvailable())) } -func WithApiServerSourceEventTypes(s *v1alpha1.ApiServerSource) { - s.Status.MarkEventTypes() +func WithApiServerSourceEventTypes(source string) ApiServerSourceOption { + return func(s *v1alpha1.ApiServerSource) { + ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(v1alpha1.ApiServerSourceEventTypes)) + for _, apiServerSourceType := range v1alpha1.ApiServerSourceEventTypes { + ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{ + Type: apiServerSourceType, + Source: source, + }) + } + s.Status.CloudEventAttributes = ceAttributes + } } func WithApiServerSourceSufficientPermissions(s *v1alpha1.ApiServerSource) { diff --git a/pkg/reconciler/testing/pingsource.go b/pkg/reconciler/testing/pingsource.go index 0437981fd06..61bdb450914 100644 --- a/pkg/reconciler/testing/pingsource.go +++ b/pkg/reconciler/testing/pingsource.go @@ -25,6 +25,7 @@ import ( "knative.dev/eventing/pkg/apis/sources/v1alpha2" "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -157,7 +158,10 @@ func WithPingSourceV1A2Deployed(s *v1alpha2.PingSource) { } func WithPingSourceEventType(s *v1alpha1.PingSource) { - s.Status.MarkEventType() + s.Status.CloudEventAttributes = []duckv1.CloudEventAttributes{{ + Type: v1alpha1.PingSourceEventType, + Source: v1alpha1.PingSourceSource(s.Namespace, s.Name), + }} } func WithPingSourceV1A2EventType(s *v1alpha2.PingSource) { diff --git a/test/e2e/source_api_server_test.go b/test/e2e/source_api_server_test.go index 2c7ca9ce361..9f258c1d92d 100644 --- a/test/e2e/source_api_server_test.go +++ b/test/e2e/source_api_server_test.go @@ -25,8 +25,10 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" + pkgResources "knative.dev/eventing/pkg/reconciler/namespace/resources" eventingtesting "knative.dev/eventing/pkg/reconciler/testing" "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" @@ -182,3 +184,76 @@ func TestApiServerSource(t *testing.T) { } } } + +func TestApiServerSourceV1Alpha2EventTypes(t *testing.T) { + const ( + sourceName = "e2e-apiserver-source-eventtypes" + serviceAccountName = "event-watcher-sa" + roleName = "event-watcher-r" + ) + + client := setup(t, true) + defer tearDown(client) + + // creates ServiceAccount and RoleBinding with a role for reading pods and events + r := resources.Role(roleName, + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"events", "pods"}, + Verbs: []string{"get", "list", "watch"}})) + client.CreateServiceAccountOrFail(serviceAccountName) + client.CreateRoleOrFail(r) + client.CreateRoleBindingOrFail( + serviceAccountName, + lib.RoleKind, + roleName, + fmt.Sprintf("%s-%s", serviceAccountName, roleName), + client.Namespace, + ) + + // Label namespace so that it creates the default broker. + if err := client.LabelNamespace(map[string]string{"knative-eventing-injection": "enabled"}); err != nil { + t.Fatalf("Error annotating namespace: %v", err) + } + + // Wait for default broker ready. + client.WaitForResourceReadyOrFail(pkgResources.DefaultBrokerName, lib.BrokerTypeMeta) + + // Create the api server source + apiServerSource := eventingtesting.NewApiServerSource( + sourceName, + client.Namespace, + eventingtesting.WithApiServerSourceSpec( + sourcesv1alpha1.ApiServerSourceSpec{ + Resources: []sourcesv1alpha1.ApiServerResource{{ + APIVersion: "v1", + Kind: "Event", + }}, + Mode: "Ref", + ServiceAccountName: serviceAccountName, + // TODO change sink to be a non-Broker one once we revisit EventType https://github.com/knative/eventing/issues/2750 + Sink: &duckv1beta1.Destination{Ref: &corev1.ObjectReference{APIVersion: "eventing.knative.dev/v1alpha1", Kind: "Broker", Name: pkgResources.DefaultBrokerName, Namespace: client.Namespace}}, + }), + ) + + client.CreateApiServerSourceOrFail(apiServerSource) + + // wait for all test resources to be ready + client.WaitForAllTestResourcesReadyOrFail() + + // verify that EventTypes were created. + eventTypes, err := client.Eventing.EventingV1alpha1().EventTypes(client.Namespace).List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("Error retrieving EventTypes: %v", err) + } + if len(eventTypes.Items) != len(sourcesv1alpha1.ApiServerSourceEventTypes) { + t.Fatalf("Invalid number of EventTypes registered for ApiServerSource: %s/%s, expected: %d, got: %d", client.Namespace, sourceName, len(sourcesv1alpha1.ApiServerSourceEventTypes), len(eventTypes.Items)) + } + + expectedCeTypes := sets.NewString(sourcesv1alpha1.ApiServerSourceEventTypes...) + for _, et := range eventTypes.Items { + if !expectedCeTypes.Has(et.Spec.Type) { + t.Fatalf("Invalid spec.type for ApiServerSource EventType, expected one of: %v, got: %s", sourcesv1alpha1.ApiServerSourceEventTypes, et.Spec.Type) + } + } +} diff --git a/test/e2e/source_ping_test.go b/test/e2e/source_ping_test.go index 1e4ecf12d4f..f714e6388b0 100644 --- a/test/e2e/source_ping_test.go +++ b/test/e2e/source_ping_test.go @@ -22,7 +22,9 @@ import ( "testing" sourcesv1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2" + pkgResources "knative.dev/eventing/pkg/reconciler/namespace/resources" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/eventing/test/lib" @@ -151,3 +153,55 @@ func TestPingSourceV1Alpha2ResourceScope(t *testing.T) { t.Fatalf("String %q not found or found multiple times in logs of logger pod %q: %v", data, loggerPodName, err) } } + +func TestPingSourceV1Alpha2EventTypes(t *testing.T) { + const ( + sourceName = "e2e-ping-source-eventtype" + ) + + client := setup(t, true) + defer tearDown(client) + + // Label namespace so that it creates the default broker. + if err := client.LabelNamespace(map[string]string{"knative-eventing-injection": "enabled"}); err != nil { + t.Fatalf("Error annotating namespace: %v", err) + } + + // Wait for default broker ready. + client.WaitForResourceReadyOrFail(pkgResources.DefaultBrokerName, lib.BrokerTypeMeta) + + // Create ping source + data := fmt.Sprintf("TestPingSource %s", uuid.NewUUID()) + source := eventingtesting.NewPingSourceV1Alpha2( + sourceName, + client.Namespace, + eventingtesting.WithPingSourceV1A2Spec(sourcesv1alpha2.PingSourceSpec{ + JsonData: data, + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + // TODO change sink to be a non-Broker one once we revisit EventType https://github.com/knative/eventing/issues/2750 + Ref: resources.KnativeRefForBroker(defaultBrokerName, client.Namespace), + }, + }, + }), + ) + client.CreatePingSourceV1Alpha2OrFail(source) + + // wait for all test resources to be ready + client.WaitForAllTestResourcesReadyOrFail() + + // verify that an EventType was created. + eventTypes, err := client.Eventing.EventingV1alpha1().EventTypes(client.Namespace).List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("Error retrieving EventTypes: %v", err) + } + if len(eventTypes.Items) != 1 { + t.Fatalf("Invalid number of EventTypes registered for PingSource: %s/%s, expected 1, got %d", client.Namespace, sourceName, len(eventTypes.Items)) + } + et := eventTypes.Items[0] + if et.Spec.Type != sourcesv1alpha2.PingSourceEventType && et.Spec.Source != sourcesv1alpha2.PingSourceSource(client.Namespace, sourceName) { + t.Fatalf("Invalid spec.type and/or spec.source for PingSource EventType, expected: type=%s source=%s, got: type=%s source=%s", + sourcesv1alpha2.PingSourceEventType, sourcesv1alpha2.PingSourceSource(client.Namespace, sourceName), et.Spec.Type, et.Spec.Source) + } + +} diff --git a/test/lib/resources/eventing.go b/test/lib/resources/eventing.go index c65ba9b8294..4a9a64366db 100644 --- a/test/lib/resources/eventing.go +++ b/test/lib/resources/eventing.go @@ -72,6 +72,15 @@ func KnativeRefForService(name, namespace string) *duckv1.KReference { } } +func KnativeRefForBroker(name, namespace string) *duckv1.KReference { + return &duckv1.KReference{ + Kind: "Broker", + APIVersion: "eventing.knative.dev/v1alpha1", + Name: name, + Namespace: namespace, + } +} + // WithSubscriberForSubscription returns an option that adds a Subscriber for the given Subscription. func WithSubscriberForSubscription(name string) SubscriptionOption { return func(s *messagingv1alpha1.Subscription) { diff --git a/vendor/knative.dev/pkg/apis/duck/v1/source_types.go b/vendor/knative.dev/pkg/apis/duck/v1/source_types.go index 782556471ee..3b378ca3d48 100644 --- a/vendor/knative.dev/pkg/apis/duck/v1/source_types.go +++ b/vendor/knative.dev/pkg/apis/duck/v1/source_types.go @@ -81,6 +81,22 @@ type SourceStatus struct { // Source. // +optional SinkURI *apis.URL `json:"sinkUri,omitempty"` + + // CloudEventAttributes are the specific attributes that the Source uses + // as part of its CloudEvents. + // +optional + CloudEventAttributes []CloudEventAttributes `json:"ceAttributes,omitempty"` +} + +// CloudEventAttributes specifies the attributes that a Source +// uses as part of its CloudEvents. +type CloudEventAttributes struct { + + // Type refers to the CloudEvent type attribute. + Type string `json:"type,omitempty"` + + // Source is the CloudEvents source attribute. + Source string `json:"source,omitempty"` } // IsReady returns true if the resource is ready overall. @@ -137,6 +153,10 @@ func (s *Source) Populate() { Host: "tableflip.dev", RawQuery: "flip=mattmoor", } + s.Status.CloudEventAttributes = []CloudEventAttributes{{ + Type: "dev.knative.foo", + Source: "http://knative.dev/knative/eventing", + }} } // GetListType implements apis.Listable diff --git a/vendor/knative.dev/pkg/apis/duck/v1/zz_generated.deepcopy.go b/vendor/knative.dev/pkg/apis/duck/v1/zz_generated.deepcopy.go index 2fcfbed776d..3397713be07 100644 --- a/vendor/knative.dev/pkg/apis/duck/v1/zz_generated.deepcopy.go +++ b/vendor/knative.dev/pkg/apis/duck/v1/zz_generated.deepcopy.go @@ -127,6 +127,22 @@ func (in *AddressableTypeList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CloudEventAttributes) DeepCopyInto(out *CloudEventAttributes) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CloudEventAttributes. +func (in *CloudEventAttributes) DeepCopy() *CloudEventAttributes { + if in == nil { + return nil + } + out := new(CloudEventAttributes) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CloudEventOverrides) DeepCopyInto(out *CloudEventOverrides) { *out = *in @@ -384,6 +400,11 @@ func (in *SourceStatus) DeepCopyInto(out *SourceStatus) { *out = new(apis.URL) (*in).DeepCopyInto(*out) } + if in.CloudEventAttributes != nil { + in, out := &in.CloudEventAttributes, &out.CloudEventAttributes + *out = make([]CloudEventAttributes, len(*in)) + copy(*out, *in) + } return }