diff --git a/config/200-controller-clusterrole.yaml b/config/200-controller-clusterrole.yaml index 60afacc783d..d6772c6d6cd 100644 --- a/config/200-controller-clusterrole.yaml +++ b/config/200-controller-clusterrole.yaml @@ -35,13 +35,6 @@ rules: - "patch" - "watch" - # Channels and Triggers both manipulate VirtualServices. - - apiGroups: - - "networking.istio.io" - resources: - - "virtualservices" - verbs: *everything - # Brokers and the namespace annotation controllers manipulate Deployments. - apiGroups: - "apps" diff --git a/contrib/gcppubsub/config/gcppubsub.yaml b/contrib/gcppubsub/config/gcppubsub.yaml index c05493180c8..a1807aa9851 100644 --- a/contrib/gcppubsub/config/gcppubsub.yaml +++ b/contrib/gcppubsub/config/gcppubsub.yaml @@ -61,16 +61,6 @@ rules: verbs: - create - update - - apiGroups: - - networking.istio.io - resources: - - virtualservices - verbs: - - get - - list - - watch - - create - - update - apiGroups: - "" # Core API Group. resources: @@ -240,15 +230,12 @@ spec: clusterChannelProvisioner: gcp-pubsub role: dispatcher ports: - - name: http - protocol: TCP + - protocol: TCP port: 80 targetPort: 8080 --- - -# Needed by the GCP PubSub Channel to communicate with GCP PubSub. - +# Needed by the GCP PubSub Channel to communicate with GCP PubSub. apiVersion: networking.istio.io/v1alpha3 kind: ServiceEntry metadata: diff --git a/contrib/gcppubsub/pkg/controller/channel/controller.go b/contrib/gcppubsub/pkg/controller/channel/controller.go index 847419fd60c..3248d2bbaed 100644 --- a/contrib/gcppubsub/pkg/controller/channel/controller.go +++ b/contrib/gcppubsub/pkg/controller/channel/controller.go @@ -19,7 +19,6 @@ package channel import ( pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -74,15 +73,6 @@ func ProvideController(defaultGcpProject string, defaultSecret *corev1.ObjectRef return nil, err } - // Watch the VirtualServices that are owned by Channels. - err = c.Watch(&source.Kind{ - Type: &istiov1alpha3.VirtualService{}, - }, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true}) - if err != nil { - logger.Error("Unable to watch VirtualServices.", zap.Error(err)) - return nil, err - } - return c, nil } } diff --git a/contrib/gcppubsub/pkg/controller/channel/reconcile.go b/contrib/gcppubsub/pkg/controller/channel/reconcile.go index aa1dc66c7be..42c9d5c6522 100644 --- a/contrib/gcppubsub/pkg/controller/channel/reconcile.go +++ b/contrib/gcppubsub/pkg/controller/channel/reconcile.go @@ -19,10 +19,10 @@ package channel import ( "context" "fmt" - "github.com/knative/eventing/pkg/apis/duck/v1alpha1" ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner" pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" + "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" util "github.com/knative/eventing/pkg/provisioners" @@ -49,21 +49,20 @@ const ( noNeedToPersist // Name of the corev1.Events emitted from the reconciliation process - channelReconciled = "ChannelReconciled" - channelUpdateStatusFailed = "ChannelUpdateStatusFailed" - channelReadStatusFailed = "ChannelReadStatusFailed" - gcpCredentialsReadFailed = "GcpCredentialsReadFailed" - gcpResourcesPlanFailed = "GcpResourcesPlanFailed" - gcpResourcesPersistFailed = "GcpResourcesPersistFailed" - virtualServiceCreateFailed = "VirtualServiceCreateFailed" - k8sServiceCreateFailed = "K8sServiceCreateFailed" - topicCreateFailed = "TopicCreateFailed" - topicDeleteFailed = "TopicDeleteFailed" - subscriptionSyncFailed = "SubscriptionSyncFailed" - subscriptionDeleteFailed = "SubscriptionDeleteFailed" + channelReconciled = "ChannelReconciled" + channelUpdateStatusFailed = "ChannelUpdateStatusFailed" + channelReadStatusFailed = "ChannelReadStatusFailed" + gcpCredentialsReadFailed = "GcpCredentialsReadFailed" + gcpResourcesPlanFailed = "GcpResourcesPlanFailed" + gcpResourcesPersistFailed = "GcpResourcesPersistFailed" + k8sServiceCreateFailed = "K8sServiceCreateFailed" + topicCreateFailed = "TopicCreateFailed" + topicDeleteFailed = "TopicDeleteFailed" + subscriptionSyncFailed = "SubscriptionSyncFailed" + subscriptionDeleteFailed = "SubscriptionDeleteFailed" ) -// reconciler reconciles GCP-PubSub Channels by creating the K8s Service and Istio VirtualService +// reconciler reconciles GCP-PubSub Channels by creating the K8s Service (ExternalName) // allowing other processes to send data to them. It also creates the GCP PubSub Topics (one per // Channel) and GCP PubSub Subscriptions (one per Subscriber). type reconciler struct { @@ -116,7 +115,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err } // Does this Controller control this Channel? - if !r.shouldReconcile(c) { + if !ShouldReconcile(c) { logging.FromContext(ctx).Info("Not reconciling Channel, it is not controlled by this Controller", zap.Any("ref", c.Spec)) return reconcile.Result{}, nil } @@ -147,9 +146,9 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err }, reconcileErr } -// shouldReconcile determines if this Controller should control (and therefore reconcile) a given +// ShouldReconcile determines if this Controller should control (and therefore reconcile) a given // Channel. This Controller only handles gcp-pubsub channels. -func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool { +func ShouldReconcile(c *eventingv1alpha1.Channel) bool { if c.Spec.Provisioner != nil { return ccpcontroller.IsControlled(c.Spec.Provisioner) } @@ -162,11 +161,10 @@ func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool { func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) (bool, error) { c.Status.InitializeConditions() - // We are syncing four things: - // 1. The K8s Service to talk to this Channel. - // 2. The Istio VirtualService to talk to this Channel. - // 3. The GCP PubSub Topic (one for the Channel). - // 4. The GCP PubSub Subscriptions (one for each Subscriber of the Channel). + // We are syncing the following: + // - The K8s Service to talk to this Channel. + // - The GCP PubSub Topic (one for the Channel). + // - The GCP PubSub Subscriptions (one for each Subscriber of the Channel). // First we will plan all the names out for steps 3 and 4 persist them to status.internal. Then, on a // subsequent reconcile, we manipulate all the GCP resources in steps 3 and 4. @@ -187,7 +185,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) } if c.DeletionTimestamp != nil { - // K8s garbage collection will delete the K8s service and VirtualService for this channel. + // K8s garbage collection will delete the K8s service for this channel. // All the subs should be deleted. subsToSync := &syncSubs{ subsToDelete: originalPCS.Subscriptions, @@ -232,18 +230,12 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) return true, nil } - svc, err := r.createK8sService(ctx, c) + _, err = r.createK8sService(ctx, c) if err != nil { r.recorder.Eventf(c, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err) return false, err } - err = r.createVirtualService(ctx, c, svc) - if err != nil { - r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) - return false, err - } - topic, err := r.createTopic(ctx, plannedPCS, gcpCreds) if err != nil { r.recorder.Eventf(c, corev1.EventTypeWarning, topicCreateFailed, "Failed to reconcile Topic for the Channel: %v", err) @@ -360,7 +352,7 @@ func (r *reconciler) planGcpResources(ctx context.Context, c *eventingv1alpha1.C } func (r *reconciler) createK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) { - svc, err := util.CreateK8sService(ctx, r.client, c) + svc, err := util.CreateK8sService(ctx, r.client, c, util.ExternalService(c)) if err != nil { logging.FromContext(ctx).Info("Error creating the Channel's K8s Service", zap.Error(err)) return nil, err @@ -370,15 +362,6 @@ func (r *reconciler) createK8sService(ctx context.Context, c *eventingv1alpha1.C return svc, nil } -func (r *reconciler) createVirtualService(ctx context.Context, c *eventingv1alpha1.Channel, svc *corev1.Service) error { - _, err := util.CreateVirtualService(ctx, r.client, c, svc) - if err != nil { - logging.FromContext(ctx).Info("Error creating the Virtual Service for the Channel", zap.Error(err)) - return err - } - return nil -} - func (r *reconciler) createTopic(ctx context.Context, plannedPCS *pubsubutil.GcpPubSubChannelStatus, gcpCreds *google.Credentials) (pubsubutil.PubSubTopic, error) { psc, err := r.pubSubClientCreator(ctx, gcpCreds, plannedPCS.GCPProject) if err != nil { diff --git a/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go b/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go index b041972563b..1a6dd3db175 100644 --- a/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go +++ b/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go @@ -22,31 +22,26 @@ import ( "fmt" "testing" - "k8s.io/apimachinery/pkg/types" - pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" - - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/knative/eventing/pkg/apis/duck/v1alpha1" - - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/testcreds" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/fakepubsub" - + "github.com/knative/eventing/contrib/gcppubsub/pkg/util/testcreds" + "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/reconciler/names" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" "github.com/knative/eventing/pkg/utils" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "github.com/knative/pkg/system" _ "github.com/knative/pkg/system/testing" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( @@ -92,18 +87,17 @@ var ( // map of events to set test cases' expectations easier events = map[string]corev1.Event{ - channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal}, - channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning}, - channelReadStatusFailed: {Reason: channelReadStatusFailed, Type: corev1.EventTypeWarning}, - gcpCredentialsReadFailed: {Reason: gcpCredentialsReadFailed, Type: corev1.EventTypeWarning}, - gcpResourcesPlanFailed: {Reason: gcpResourcesPlanFailed, Type: corev1.EventTypeWarning}, - gcpResourcesPersistFailed: {Reason: gcpResourcesPersistFailed, Type: corev1.EventTypeWarning}, - virtualServiceCreateFailed: {Reason: virtualServiceCreateFailed, Type: corev1.EventTypeWarning}, - k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning}, - topicCreateFailed: {Reason: topicCreateFailed, Type: corev1.EventTypeWarning}, - topicDeleteFailed: {Reason: topicDeleteFailed, Type: corev1.EventTypeWarning}, - subscriptionSyncFailed: {Reason: subscriptionSyncFailed, Type: corev1.EventTypeWarning}, - subscriptionDeleteFailed: {Reason: subscriptionDeleteFailed, Type: corev1.EventTypeWarning}, + channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal}, + channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning}, + channelReadStatusFailed: {Reason: channelReadStatusFailed, Type: corev1.EventTypeWarning}, + gcpCredentialsReadFailed: {Reason: gcpCredentialsReadFailed, Type: corev1.EventTypeWarning}, + gcpResourcesPlanFailed: {Reason: gcpResourcesPlanFailed, Type: corev1.EventTypeWarning}, + gcpResourcesPersistFailed: {Reason: gcpResourcesPersistFailed, Type: corev1.EventTypeWarning}, + k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning}, + topicCreateFailed: {Reason: topicCreateFailed, Type: corev1.EventTypeWarning}, + topicDeleteFailed: {Reason: topicDeleteFailed, Type: corev1.EventTypeWarning}, + subscriptionSyncFailed: {Reason: subscriptionSyncFailed, Type: corev1.EventTypeWarning}, + subscriptionDeleteFailed: {Reason: subscriptionDeleteFailed, Type: corev1.EventTypeWarning}, } ) @@ -111,7 +105,6 @@ func init() { // Add types to scheme. eventingv1alpha1.AddToScheme(scheme.Scheme) corev1.AddToScheme(scheme.Scheme) - istiov1alpha3.AddToScheme(scheme.Scheme) } func TestInjectClient(t *testing.T) { @@ -482,62 +475,6 @@ func TestReconcile(t *testing.T) { events[k8sServiceCreateFailed], }, }, - { - Name: "Virtual service get fails", - InitialState: []runtime.Object{ - makeChannelWithFinalizerAndPCS(), - makeK8sService(), - makeVirtualService(), - testcreds.MakeSecretWithCreds(), - }, - Mocks: controllertesting.Mocks{ - MockLists: errorListingVirtualService(), - }, - WantPresent: []runtime.Object{ - // TODO: This should have a useful error message saying that the VirtualService - // failed. - makeChannelWithFinalizerAndPCSAndAddress(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[virtualServiceCreateFailed], - }, - }, - { - Name: "Virtual service creation fails", - InitialState: []runtime.Object{ - makeChannelWithFinalizerAndPCS(), - makeK8sService(), - testcreds.MakeSecretWithCreds(), - }, - Mocks: controllertesting.Mocks{ - MockCreates: errorCreatingVirtualService(), - }, - WantPresent: []runtime.Object{ - // TODO: This should have a useful error message saying that the VirtualService - // failed. - makeChannelWithFinalizerAndPCSAndAddress(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[virtualServiceCreateFailed], - }, - }, - { - Name: "VirtualService already exists - not owned by Channel", - InitialState: []runtime.Object{ - makeChannelWithFinalizerAndPCS(), - makeK8sService(), - makeVirtualServiceNotOwnedByChannel(), - testcreds.MakeSecretWithCreds(), - }, - WantPresent: []runtime.Object{ - makeReadyChannel(), - }, - WantEvent: []corev1.Event{ - events[channelReconciled], - }, - }, { Name: "Error planning - subscriber missing UID", InitialState: []runtime.Object{ @@ -573,7 +510,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, OtherTestData: map[string]interface{}{ @@ -594,7 +530,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, OtherTestData: map[string]interface{}{ @@ -618,8 +553,6 @@ func TestReconcile(t *testing.T) { Name: "Create Topic - topic already exists", InitialState: []runtime.Object{ makeChannelWithFinalizerAndPCS(), - makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, OtherTestData: map[string]interface{}{ @@ -632,6 +565,7 @@ func TestReconcile(t *testing.T) { }, }, WantPresent: []runtime.Object{ + makeK8sService(), makeReadyChannel(), }, WantEvent: []corev1.Event{ @@ -643,7 +577,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, OtherTestData: map[string]interface{}{ @@ -665,12 +598,11 @@ func TestReconcile(t *testing.T) { Name: "Create Topic - topic create succeeds", InitialState: []runtime.Object{ makeChannelWithFinalizerAndPCS(), - makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, WantPresent: []runtime.Object{ makeReadyChannel(), + makeK8sService(), }, WantEvent: []corev1.Event{ events[channelReconciled], @@ -681,7 +613,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithSubscribersAndFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, OtherTestData: map[string]interface{}{ @@ -706,7 +637,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithSubscribersAndFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, OtherTestData: map[string]interface{}{ @@ -730,7 +660,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithSubscribersAndFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, OtherTestData: map[string]interface{}{ @@ -753,7 +682,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithSubscribersAndFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, WantPresent: []runtime.Object{ @@ -768,7 +696,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, Mocks: controllertesting.Mocks{ @@ -784,7 +711,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannel(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, Mocks: controllertesting.Mocks{ @@ -799,7 +725,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeChannelWithFinalizerAndPCS(), makeK8sService(), - makeVirtualService(), testcreds.MakeSecretWithCreds(), }, Mocks: controllertesting.Mocks{ @@ -856,7 +781,9 @@ func makeChannel() *eventingv1alpha1.Channel { func makeChannelWithFinalizerAndPCSAndAddress() *eventingv1alpha1.Channel { c := makeChannelWithFinalizerAndPCS() - c.Status.SetAddress(fmt.Sprintf("%s-channel.%s.svc.%s", c.Name, c.Namespace, utils.GetClusterDomainName())) + // serviceAddress is the address of the K8s Service. It uses a GeneratedName and the fake client + // does not fill in Name, so the name is the empty string. + c.Status.SetAddress(fmt.Sprintf(".%s.svc.%s", c.Namespace, utils.GetClusterDomainName())) return c } @@ -1075,11 +1002,13 @@ func makeK8sService() *corev1.Service { Kind: "Service", }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-channel", cName), - Namespace: cNamespace, + GenerateName: fmt.Sprintf("%s-channel-", cName), + Namespace: cNamespace, Labels: map[string]string{ - "channel": cName, - "provisioner": ccpName, + util.EventingChannelLabel: cName, + util.OldEventingChannelLabel: cName, + util.EventingProvisionerLabel: ccpName, + util.OldEventingProvisionerLabel: ccpName, }, OwnerReferences: []metav1.OwnerReference{ { @@ -1093,68 +1022,12 @@ func makeK8sService() *corev1.Service { }, }, Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - { - Name: util.PortName, - Port: util.PortNumber, - }, - }, + ExternalName: names.ServiceHostName(fmt.Sprintf("%s-dispatcher", ccpName), system.Namespace()), + Type: corev1.ServiceTypeExternalName, }, } } -func makeVirtualService() *istiov1alpha3.VirtualService { - return &istiov1alpha3.VirtualService{ - TypeMeta: metav1.TypeMeta{ - APIVersion: istiov1alpha3.SchemeGroupVersion.String(), - Kind: "VirtualService", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-channel", cName), - Namespace: cNamespace, - Labels: map[string]string{ - "channel": cName, - "provisioner": ccpName, - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), - Kind: "Channel", - Name: cName, - UID: cUID, - Controller: &truePointer, - BlockOwnerDeletion: &truePointer, - }, - }, - }, - Spec: istiov1alpha3.VirtualServiceSpec{ - Hosts: []string{ - fmt.Sprintf("%s-channel.%s.svc.%s", cName, cNamespace, utils.GetClusterDomainName()), - fmt.Sprintf("%s.%s.channels.%s", cName, cNamespace, utils.GetClusterDomainName()), - }, - HTTP: []istiov1alpha3.HTTPRoute{{ - Rewrite: &istiov1alpha3.HTTPRewrite{ - Authority: fmt.Sprintf("%s.%s.channels.%s", cName, cNamespace, utils.GetClusterDomainName()), - }, - Route: []istiov1alpha3.HTTPRouteDestination{{ - Destination: istiov1alpha3.Destination{ - Host: "in-memory-channel-clusterbus.knative-eventing.svc." + utils.GetClusterDomainName(), - Port: istiov1alpha3.PortSelector{ - Number: util.PortNumber, - }, - }}, - }}, - }, - }, - } -} - -func makeVirtualServiceNotOwnedByChannel() *istiov1alpha3.VirtualService { - vs := makeVirtualService() - vs.OwnerReferences = nil - return vs -} - func errorOnSecondChannelGet() []controllertesting.MockGet { passThrough := []controllertesting.MockGet{ func(innerClient client.Client, ctx context.Context, key client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { @@ -1185,18 +1058,6 @@ func errorListingK8sService() []controllertesting.MockList { }, } } - -func errorListingVirtualService() []controllertesting.MockList { - return []controllertesting.MockList{ - func(_ client.Client, _ context.Context, _ *client.ListOptions, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*istiov1alpha3.VirtualServiceList); ok { - return controllertesting.Handled, errors.New(testErrorMessage) - } - return controllertesting.Unhandled, nil - }, - } -} - func errorCreatingK8sService() []controllertesting.MockCreate { return []controllertesting.MockCreate{ func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { @@ -1208,17 +1069,6 @@ func errorCreatingK8sService() []controllertesting.MockCreate { } } -func errorCreatingVirtualService() []controllertesting.MockCreate { - return []controllertesting.MockCreate{ - func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*istiov1alpha3.VirtualService); ok { - return controllertesting.Handled, errors.New(testErrorMessage) - } - return controllertesting.Unhandled, nil - }, - } -} - func errorUpdatingChannel() []controllertesting.MockUpdate { return []controllertesting.MockUpdate{ func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { diff --git a/contrib/gcppubsub/pkg/controller/cmd/main.go b/contrib/gcppubsub/pkg/controller/cmd/main.go index baa5540a9f0..ce4961befc5 100644 --- a/contrib/gcppubsub/pkg/controller/cmd/main.go +++ b/contrib/gcppubsub/pkg/controller/cmd/main.go @@ -21,15 +21,13 @@ import ( "log" "os" - "github.com/knative/eventing/pkg/provisioners" - v1 "k8s.io/api/core/v1" - "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/channel" "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "github.com/knative/eventing/pkg/provisioners" "github.com/knative/pkg/signals" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -62,7 +60,6 @@ func main() { // Add custom types to this array to get them into the manager's scheme. eventingv1alpha1.AddToScheme(mgr.GetScheme()) - istiov1alpha3.AddToScheme(mgr.GetScheme()) // The controllers for both the ClusterChannelProvisioner and the Channels created by that // ClusterChannelProvisioner run in this process. diff --git a/contrib/gcppubsub/pkg/dispatcher/cmd/main.go b/contrib/gcppubsub/pkg/dispatcher/cmd/main.go index 078e08bf2d4..4b03eb258c3 100644 --- a/contrib/gcppubsub/pkg/dispatcher/cmd/main.go +++ b/contrib/gcppubsub/pkg/dispatcher/cmd/main.go @@ -17,6 +17,7 @@ package main import ( + "context" "flag" "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner" @@ -29,6 +30,7 @@ import ( "go.uber.org/zap" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // This is the main method for the GCP PubSub Channel dispatcher. It handles all the data-plane @@ -61,7 +63,7 @@ func main() { // PubSub) and the dispatcher (takes messages in PubSub and sends them in cluster) in this // binary. - _, runnables, err := receiver.New(logger.Desugar(), mgr.GetClient(), util.GcpPubSubClientCreator) + receiver, runnables, err := receiver.New(logger.Desugar(), mgr.GetClient(), util.GcpPubSubClientCreator) if err != nil { logger.Fatal("Unable to create new receiver and runnable", zap.Error(err)) } @@ -72,7 +74,14 @@ func main() { } } - if _, err = dispatcher.New(mgr, logger.Desugar()); err != nil { + if _, err = dispatcher.New( + mgr, + logger.Desugar(), + []dispatcher.ReconcileHandler{ + func(ctx context.Context, _ reconcile.Request) error { + return receiver.UpdateHostToChannelMap(ctx) + }, + }); err != nil { logger.Fatal("Unable to create the dispatcher", zap.Error(err)) } diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go index 00dd7cdf44a..781ea63b6b9 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go @@ -21,15 +21,13 @@ import ( "sync" "time" - "k8s.io/client-go/util/workqueue" - - "sigs.k8s.io/controller-runtime/pkg/event" - pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/source" @@ -53,7 +51,7 @@ const ( // New returns a Controller that represents the dispatcher portion (messages from GCP PubSub are // sent into the cluster) of the GCP PubSub dispatcher. We use a reconcile loop to watch all // Channels and notice changes to them. It uses an exponential backoff to throttle the retries. -func New(mgr manager.Manager, logger *zap.Logger) (controller.Controller, error) { +func New(mgr manager.Manager, logger *zap.Logger, additionalHandlers []ReconcileHandler) (controller.Controller, error) { // reconcileChan is used when the dispatcher itself needs to force reconciliation of a Channel. reconcileChan := make(chan event.GenericEvent) @@ -71,7 +69,8 @@ func New(mgr manager.Manager, logger *zap.Logger) (controller.Controller, error) subscriptionsLock: sync.Mutex{}, subscriptions: map[channelName]map[subscriptionName]context.CancelFunc{}, - rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(expBackoffBaseDelay, expBackoffMaxDelay), + rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(expBackoffBaseDelay, expBackoffMaxDelay), + additionalHandlers: additionalHandlers, } c, err := controller.New(controllerAgentName, mgr, controller.Options{ diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go index 23d0884d981..78c51756373 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go @@ -22,20 +22,18 @@ import ( "sync" "time" - v1 "k8s.io/api/core/v1" - - "k8s.io/client-go/util/workqueue" - - ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner" + "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/channel" pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners" util "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -54,6 +52,9 @@ type channelName = types.NamespacedName type subscriptionName = types.UID type empty struct{} +// ReconcileHandler will be run by in addition to existing reconcile. +type ReconcileHandler func(context.Context, reconcile.Request) error + // reconciler reconciles Channels with the gcp-pubsub provisioner. It sets up hanging polling for // every Subscription to any Channel. type reconciler struct { @@ -76,6 +77,8 @@ type reconciler struct { // rateLimiter is used to limit the pace at which we nack a message when it could not be dispatched. rateLimiter workqueue.RateLimiter + + additionalHandlers []ReconcileHandler } // Verify the struct implements reconcile.Reconciler @@ -106,7 +109,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err } // Does this Controller control this Channel? - if !r.shouldReconcile(c) { + if !channel.ShouldReconcile(c) { logging.FromContext(ctx).Info("Not reconciling Channel, it is not controlled by this Controller", zap.Any("ref", c.Spec)) return reconcile.Result{}, nil } @@ -145,15 +148,6 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err }, reconcileErr } -// shouldReconcile determines if this Controller should control (and therefore reconcile) a given -// ClusterChannelProvisioner. This Controller only handles gcp-pubsub Channels. -func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool { - if c.Spec.Provisioner != nil { - return ccpcontroller.IsControlled(c.Spec.Provisioner) - } - return false -} - // reconcile reconciles this Channel so that the real world matches the intended state. The returned // boolean indicates if this Channel should be immediately requeued for another reconcile loop. The // returned error indicates an error during reconciliation. @@ -176,6 +170,12 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel, return true, nil } + for _, h := range r.additionalHandlers { + if err := h(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: c.Name, Namespace: c.Namespace}}); err != nil { + logging.FromContext(ctx).Error("Failed reconcile.", zap.Error(err)) + return false, err + } + } // enqueueChannelForReconciliation is a function that when run will force this Channel to be // reconciled again. enqueueChannelForReconciliation := func() { diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go index 2b8d5ae0544..6814dc6448e 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go @@ -26,32 +26,25 @@ import ( "testing" "time" - "k8s.io/client-go/util/workqueue" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util" - - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/knative/eventing/pkg/provisioners" - - "sigs.k8s.io/controller-runtime/pkg/event" - + "github.com/knative/eventing/contrib/gcppubsub/pkg/util/fakepubsub" "github.com/knative/eventing/contrib/gcppubsub/pkg/util/testcreds" "github.com/knative/eventing/pkg/apis/duck/v1alpha1" - "github.com/knative/eventing/pkg/utils" - - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/fakepubsub" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/knative/eventing/pkg/utils" _ "github.com/knative/pkg/system/testing" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( @@ -65,10 +58,11 @@ const ( gcpProject = "gcp-project" - pscData = "pscData" - reconcileChan = "reconcileChan" - shouldBeCanceled = "shouldBeCanceled" - shouldNotBeCanceled = "shouldNotBeCanceled" + pscData = "pscData" + reconcileChan = "reconcileChan" + shouldBeCanceled = "shouldBeCanceled" + shouldNotBeCanceled = "shouldNotBeCanceled" + additionalHandlerError = "Error in additional test handler." ) var ( @@ -93,6 +87,8 @@ var ( dispatcherReconcileFailed: {Reason: dispatcherReconcileFailed, Type: corev1.EventTypeWarning}, dispatcherUpdateStatusFailed: {Reason: dispatcherUpdateStatusFailed, Type: corev1.EventTypeWarning}, } + + hostname = fmt.Sprintf("%s-channel.%s.svc.%s", cName, cNamespace, utils.GetClusterDomainName()) ) func init() { @@ -367,6 +363,22 @@ func TestReconcile(t *testing.T) { events[dispatcherReconciled], events[dispatcherUpdateStatusFailed], }, }, + { + Name: "Fail additional reconcile handler", + InitialState: []runtime.Object{ + makeChannelWithSubscribersAndFinalizer(), + testcreds.MakeSecretWithCreds(), + }, + WantPresent: []runtime.Object{ + makeChannelWithSubscribersAndFinalizer(), + }, + WantEvent: []corev1.Event{ + events[dispatcherReconcileFailed], + }, + OtherTestData: map[string]interface{}{ + additionalHandlerError: additionalHandlerError, + }, + }, // Note - we do not test update status since this dispatcher only adds // finalizers to the channel } @@ -414,12 +426,19 @@ func TestReconcile(t *testing.T) { r.subscriptions[c][s] = cc.wantNotCancel(c, s) } } + if tc.OtherTestData[additionalHandlerError] != nil { + r.additionalHandlers = []ReconcileHandler{ + func(_ context.Context, _ reconcile.Request) error { + return fmt.Errorf(tc.OtherTestData[additionalHandlerError].(string)) + }, + } + tc.WantErrMsg = additionalHandlerError + } tc.AdditionalVerification = append(tc.AdditionalVerification, cc.verify) tc.IgnoreTimes = true t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } - func TestReceiveFunc(t *testing.T) { testCases := map[string]struct { ack bool @@ -518,7 +537,7 @@ func makeChannel() *eventingv1alpha1.Channel { }, } c.Status.InitializeConditions() - c.Status.SetAddress(fmt.Sprintf("%s-channel.%s.svc.%s", c.Name, c.Namespace, utils.GetClusterDomainName())) + c.Status.SetAddress(hostname) c.Status.MarkProvisioned() pcs := &util.GcpPubSubChannelStatus{ GCPProject: gcpProject, @@ -638,6 +657,16 @@ func errorGettingChannel() []controllertesting.MockGet { } } +func errorListingChannels() []controllertesting.MockList { + return []controllertesting.MockList{ + func(_ client.Client, _ context.Context, _ *client.ListOptions, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*eventingv1alpha1.ChannelList); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} func errorUpdatingChannel() []controllertesting.MockUpdate { return []controllertesting.MockUpdate{ func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { diff --git a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go index 665bb80dda5..09117122e46 100644 --- a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go +++ b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go @@ -20,11 +20,15 @@ import ( "context" "errors" "fmt" + "sync" + "sync/atomic" "cloud.google.com/go/pubsub" + "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/channel" "github.com/knative/eventing/contrib/gcppubsub/pkg/dispatcher/receiver/cache" "github.com/knative/eventing/contrib/gcppubsub/pkg/util" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/channelwatcher" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" @@ -40,6 +44,9 @@ type Receiver struct { pubSubClientCreator util.PubSubClientCreator cache *cache.TTL + + hostToChannelMapMutex sync.Mutex + hostToChannelMap atomic.Value } // New creates a new Receiver and its associated MessageReceiver. The caller is responsible for @@ -52,7 +59,9 @@ func New(logger *zap.Logger, client client.Client, pubSubClientCreator util.PubS pubSubClientCreator: pubSubClientCreator, cache: cache.NewTTL(), } + r.setHostToChannelMap(map[string]provisioners.ChannelReference{}) receiver, err := r.newMessageReceiver() + if err != nil { return nil, nil, err } @@ -60,7 +69,18 @@ func New(logger *zap.Logger, client client.Client, pubSubClientCreator util.PubS } func (r *Receiver) newMessageReceiver() (*provisioners.MessageReceiver, error) { - return provisioners.NewMessageReceiver(r.sendEventToTopic, r.logger.Sugar()) + return provisioners.NewMessageReceiver( + r.sendEventToTopic, + r.logger.Sugar(), + provisioners.ResolveChannelFromHostHeader(r.getChannelReferenceFromHost)) +} +func (r *Receiver) getChannelReferenceFromHost(host string) (provisioners.ChannelReference, error) { + chMap := r.getHostToChannelMap() + cr, ok := chMap[host] + if !ok { + return cr, fmt.Errorf("Invalid HostName:%q. HostName not found in any of the watched gcp-pubsub channels", host) + } + return cr, nil } // sendEventToTopic sends a message to the Cloud Pub/Sub Topic backing the Channel. @@ -152,3 +172,36 @@ func (r *Receiver) getChannel(ctx context.Context, ref provisioners.ChannelRefer return c, err } +func (r *Receiver) getHostToChannelMap() map[string]provisioners.ChannelReference { + return r.hostToChannelMap.Load().(map[string]provisioners.ChannelReference) +} + +func (r *Receiver) setHostToChannelMap(hcMap map[string]provisioners.ChannelReference) { + r.hostToChannelMap.Store(hcMap) +} + +// UpdateHostToChannelMap will be called from the controller that watches gcp-pubsub channels. +// It will update internal hostToChannelMap which is used to resolve the hostHeader of the +// incoming request to the correct ChannelReference in the receiver function. +func (r *Receiver) UpdateHostToChannelMap(ctx context.Context) error { + logging.FromContext(ctx).Debug("UpdateHostToChannelMap: Acquiring mutex lock") + r.hostToChannelMapMutex.Lock() + defer r.hostToChannelMapMutex.Unlock() + logging.FromContext(ctx).Debug("UpdateHostToChannelMap: Acquired mutex lock. Updating internal map") + + chanList, err := channelwatcher.ListAllChannels(ctx, r.client, channel.ShouldReconcile) + if err != nil { + logging.FromContext(ctx).Error("UpdateHostToChannelMap: Failed to list all channels.", zap.Error(err)) + return err + } + + hostToChanMap, err := provisioners.NewHostNameToChannelRefMap(chanList) + if err != nil { + logging.FromContext(ctx).Error("UpdateHostToChannelMap: Error occured when creating the new hostToChannel map.", zap.Error(err)) + return err + } + + r.setHostToChannelMap(hostToChanMap) + logging.FromContext(ctx).Info("UpdateHostToChannelMap: Update successful.") + return nil +} diff --git a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go index c4789c2c9ed..f850d26049a 100644 --- a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go +++ b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go @@ -19,25 +19,25 @@ package receiver import ( "context" "errors" + "fmt" "net/http/httptest" "strings" "testing" + "github.com/google/go-cmp/cmp" "github.com/knative/eventing/contrib/gcppubsub/pkg/util" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "k8s.io/client-go/kubernetes/scheme" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/fakepubsub" - "github.com/knative/eventing/pkg/utils" + "github.com/knative/eventing/contrib/gcppubsub/pkg/util/testcreds" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners" + controllertesting "github.com/knative/eventing/pkg/reconciler/testing" "go.uber.org/zap" - + corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/testcreds" ) const ( @@ -54,6 +54,9 @@ const ( "contentType" : "text/xml", "data" : "" }` + ccpName = "gcp-pubsub" + listChannelsFailed = "Failed to list channels" + hostname = "a.b.c.d" ) func init() { @@ -76,14 +79,14 @@ func TestReceiver(t *testing.T) { "can't read status": { initialState: []runtime.Object{ testcreds.MakeSecretWithInvalidCreds(), - makeChannelWithBadStatus(), + makeChannel(withBadStatus()), }, expectedErr: true, }, "blank status": { initialState: []runtime.Object{ testcreds.MakeSecretWithInvalidCreds(), - makeChannelWithBlankStatus(), + makeChannel(withBlankStatus()), }, expectedErr: true, }, @@ -123,7 +126,7 @@ func TestReceiver(t *testing.T) { "Publish succeeds": { initialState: []runtime.Object{ testcreds.MakeSecretWithCreds(), - makeChannel(), + makeChannel(withStatusReady(hostname)), }, }, } @@ -136,13 +139,15 @@ func TestReceiver(t *testing.T) { if err != nil { t.Fatalf("Error when creating a New receiver. Error:%s", err) } + mr.setHostToChannelMap(map[string]provisioners.ChannelReference{}) resp := httptest.NewRecorder() req := httptest.NewRequest("POST", "/", strings.NewReader(validMessage)) - req.Host = "test-channel.test-namespace.channels." + utils.GetClusterDomainName() + req.Host = hostname receiver, err := mr.newMessageReceiver() if err != nil { t.Fatalf("Error when creating a new message receiver. Error:%s", err) } + mr.UpdateHostToChannelMap(context.TODO()) receiver.HandleRequest(resp, req) if tc.expectedErr { if resp.Result().StatusCode >= 200 && resp.Result().StatusCode < 300 { @@ -157,7 +162,112 @@ func TestReceiver(t *testing.T) { } } -func makeChannel() *eventingv1alpha1.Channel { +func TestUpdateHostToChannelMap(t *testing.T) { + testCases := []struct { + name string + initialState []runtime.Object + expectedMap map[string]provisioners.ChannelReference + expectedErrMsg string + mocks controllertesting.Mocks + }{ + { + name: "client.List() channels fails.", + initialState: []runtime.Object{ + makeChannel(withStatusReady(hostname)), + }, + expectedErrMsg: listChannelsFailed, + expectedMap: map[string]provisioners.ChannelReference{}, + mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, _ *client.ListOptions, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, fmt.Errorf(listChannelsFailed) + }, + }, + }, + }, + { + name: "Duplciate hostnames.", + initialState: []runtime.Object{ + makeChannel(withName("chan1"), withNamespace("ns1"), withStatusReady("host.name")), + makeChannel(withName("chan2"), withNamespace("ns2"), withStatusReady("host.name")), + }, + expectedErrMsg: "Duplicate hostName found. Each channel must have a unique host header. HostName:host.name, channel:ns2.chan2, channel:ns1.chan1", + expectedMap: map[string]provisioners.ChannelReference{}, + }, + { + name: "Successfully updated hostToChannelMap.", + initialState: []runtime.Object{ + makeChannel(withName("chan1"), withNamespace("ns1"), withStatusReady("host.name1")), + makeChannel(withName("chan2"), withNamespace("ns2"), withStatusReady("host.name2")), + }, + expectedMap: map[string]provisioners.ChannelReference{ + "host.name1": provisioners.ChannelReference{Name: "chan1", Namespace: "ns1"}, + "host.name2": provisioners.ChannelReference{Name: "chan2", Namespace: "ns2"}, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c := controllertesting.NewMockClient(fake.NewFakeClient(tc.initialState...), tc.mocks) + r, _, err := New(zap.NewNop(), c, fakepubsub.Creator(nil)) + if err != nil { + t.Fatalf("Failed to create receiver.") + } + if err := r.UpdateHostToChannelMap(context.Background()); err != nil { + if diff := cmp.Diff(tc.expectedErrMsg, err.Error()); diff != "" { + t.Fatalf("Unexpected difference (-want +got): %v", diff) + } + } else if tc.expectedErrMsg != "" { + t.Fatalf("Want error:%s, Got nil", tc.expectedErrMsg) + } + + if diff := cmp.Diff(tc.expectedMap, r.getHostToChannelMap()); diff != "" { + t.Fatalf("Unexpected difference (-want +got): %v", diff) + } + }) + } +} + +type option func(*eventingv1alpha1.Channel) + +func withName(name string) option { + return func(c *eventingv1alpha1.Channel) { + c.Name = name + } +} + +func withNamespace(ns string) option { + return func(c *eventingv1alpha1.Channel) { + c.Namespace = ns + } +} + +func withStatusReady(hn string) option { + return func(c *eventingv1alpha1.Channel) { + c.Status.InitializeConditions() + c.Status.InitializeConditions() + c.Status.MarkProvisioned() + c.Status.MarkProvisionerInstalled() + c.Status.SetAddress(hn) + } +} + +func withBlankStatus() option { + return func(c *eventingv1alpha1.Channel) { + c.Status = eventingv1alpha1.ChannelStatus{} + } +} + +func withBadStatus() option { + return func(c *eventingv1alpha1.Channel) { + c.Status.Internal = &runtime.RawExtension{ + // SecretKey must be a string, not an integer, so this will fail during json.Unmarshal. + Raw: []byte(`{"secretKey": 123}`), + } + } +} + +func makeChannel(opts ...option) *eventingv1alpha1.Channel { c := &eventingv1alpha1.Channel{ TypeMeta: v1.TypeMeta{ APIVersion: "eventing.knative.dev/v1alpha1", @@ -167,6 +277,11 @@ func makeChannel() *eventingv1alpha1.Channel { Namespace: "test-namespace", Name: "test-channel", }, + Spec: eventingv1alpha1.ChannelSpec{ + Provisioner: &corev1.ObjectReference{ + Name: ccpName, + }, + }, } pcs := &util.GcpPubSubChannelStatus{ GCPProject: "project", @@ -176,20 +291,9 @@ func makeChannel() *eventingv1alpha1.Channel { if err := util.SetInternalStatus(context.Background(), c, pcs); err != nil { panic(err) } - return c -} - -func makeChannelWithBlankStatus() *eventingv1alpha1.Channel { - c := makeChannel() - c.Status = eventingv1alpha1.ChannelStatus{} - return c -} -func makeChannelWithBadStatus() *eventingv1alpha1.Channel { - c := makeChannel() - c.Status.Internal = &runtime.RawExtension{ - // SecretKey must be a string, not an integer, so this will fail during json.Unmarshal. - Raw: []byte(`{"secretKey": 123}`), + for _, opt := range opts { + opt(c) } return c } diff --git a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go index ecbb7d8211f..8d900564b63 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go +++ b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go @@ -76,6 +76,7 @@ func NewDispatcher(natssURL, clusterID string, logger *zap.Logger) (*Subscriptio clusterID: clusterID, subscriptions: make(map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription), } + d.setHostToChannelMap(map[string]provisioners.ChannelReference{}) receiver, err := provisioners.NewMessageReceiver( createReceiverFunction(d, logger.Sugar()), logger.Sugar(), @@ -84,8 +85,6 @@ func NewDispatcher(natssURL, clusterID string, logger *zap.Logger) (*Subscriptio return nil, err } d.receiver = receiver - d.setHostToChannelMap(map[string]provisioners.ChannelReference{}) - return d, nil } @@ -313,21 +312,11 @@ func (s *SubscriptionsSupervisor) setHostToChannelMap(hcMap map[string]provision // It will update internal hostToChannelMap which is used to resolve the hostHeader of the // incoming request to the correct ChannelReference in the receiver function. func (s *SubscriptionsSupervisor) UpdateHostToChannelMap(ctx context.Context, chanList []eventingv1alpha1.Channel) error { - hostToChanMap := make(map[string]provisioners.ChannelReference, len(chanList)) - for _, c := range chanList { - hostName := c.Status.Address.Hostname - if cr, ok := hostToChanMap[hostName]; ok { - return fmt.Errorf( - "Duplicate hostName found. Each channel must have a unique host header. HostName:%s, channel:%s.%s, channel:%s.%s", - hostName, - c.Namespace, - c.Name, - cr.Namespace, - cr.Name) - } - hostToChanMap[hostName] = provisioners.ChannelReference{Name: c.Name, Namespace: c.Namespace} + hostToChanMap, err := provisioners.NewHostNameToChannelRefMap(chanList) + if err != nil { + logging.FromContext(ctx).Info("UpdateHostToChannelMap: Error occured when creating the new hostToChannel map.", zap.Error(err)) + return err } - s.setHostToChannelMap(hostToChanMap) logging.FromContext(ctx).Info("hostToChannelMap updated successfully.") return nil diff --git a/pkg/channelwatcher/channel_watcher.go b/pkg/channelwatcher/channel_watcher.go index 249fe2ef439..011554b93dd 100644 --- a/pkg/channelwatcher/channel_watcher.go +++ b/pkg/channelwatcher/channel_watcher.go @@ -39,7 +39,7 @@ type reconciler struct { } func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { - ctx := logging.WithLogger(context.TODO(), r.logger.With(zap.Any("request", req))) + ctx := logging.WithLogger(context.Background(), r.logger.With(zap.Any("request", req))) logging.FromContext(ctx).Info("New update for channel.") if err := r.handler(ctx, r.client, req.NamespacedName); err != nil { logging.FromContext(ctx).Error("WatchHandlerFunc returned error", zap.Error(err)) diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index 69e1224e062..df914bc8a6b 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -394,3 +394,22 @@ func channelServiceName(channelName string) string { func channelHostName(channelName, namespace string) string { return fmt.Sprintf("%s.%s.channels.%s", channelName, namespace, utils.GetClusterDomainName()) } + +// NewHostNameToChannelRefMap parses each channel from cList and creates a map[string(Status.Address.HostName)]ChannelReference +func NewHostNameToChannelRefMap(cList []eventingv1alpha1.Channel) (map[string]ChannelReference, error) { + hostToChanMap := make(map[string]ChannelReference, len(cList)) + for _, c := range cList { + hostName := c.Status.Address.Hostname + if cr, ok := hostToChanMap[hostName]; ok { + return nil, fmt.Errorf( + "Duplicate hostName found. Each channel must have a unique host header. HostName:%s, channel:%s.%s, channel:%s.%s", + hostName, + c.Namespace, + c.Name, + cr.Namespace, + cr.Name) + } + hostToChanMap[hostName] = ChannelReference{Name: c.Name, Namespace: c.Namespace} + } + return hostToChanMap, nil +} diff --git a/pkg/provisioners/inmemory/channel/reconcile_test.go b/pkg/provisioners/inmemory/channel/reconcile_test.go index 76d75f3c7aa..e5af96d4dc2 100644 --- a/pkg/provisioners/inmemory/channel/reconcile_test.go +++ b/pkg/provisioners/inmemory/channel/reconcile_test.go @@ -32,7 +32,6 @@ import ( "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" "github.com/knative/eventing/pkg/utils" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "github.com/knative/pkg/system" _ "github.com/knative/pkg/system/testing" "go.uber.org/zap" @@ -191,7 +190,6 @@ func init() { // Add types to scheme. _ = eventingv1alpha1.AddToScheme(scheme.Scheme) _ = corev1.AddToScheme(scheme.Scheme) - _ = istiov1alpha3.AddToScheme(scheme.Scheme) } func TestInjectClient(t *testing.T) { diff --git a/pkg/provisioners/inmemory/controller/main.go b/pkg/provisioners/inmemory/controller/main.go index 2b09c992b4f..6a1921d7c31 100644 --- a/pkg/provisioners/inmemory/controller/main.go +++ b/pkg/provisioners/inmemory/controller/main.go @@ -24,7 +24,6 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/provisioners/inmemory/clusterchannelprovisioner" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "github.com/knative/pkg/signals" "go.uber.org/zap" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -50,7 +49,6 @@ func main() { // Add custom types to this array to get them into the manager's scheme. eventingv1alpha1.AddToScheme(mgr.GetScheme()) - istiov1alpha3.AddToScheme(mgr.GetScheme()) // The controllers for both the ClusterChannelProvisioner and the Channels created by that // ClusterChannelProvisioner run in this process.