diff --git a/pkg/apis/eventing/v1alpha1/broker_lifecycle.go b/pkg/apis/eventing/v1alpha1/broker_lifecycle.go index 54e4c23d559..64ae846dfae 100644 --- a/pkg/apis/eventing/v1alpha1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1alpha1/broker_lifecycle.go @@ -101,6 +101,14 @@ func (bs *BrokerStatus) PropagateFilterDeploymentAvailability(d *appsv1.Deployme } } +func (bs *BrokerStatus) MarkIngressReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) +} + +func (bs *BrokerStatus) MarkFilterReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) +} + // SetAddress makes this Broker addressable by setting the hostname. It also // sets the BrokerConditionAddressable to true. func (bs *BrokerStatus) SetAddress(url *apis.URL) { diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 607b848042f..dc0c1cc1a14 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -22,18 +22,15 @@ import ( "fmt" "go.uber.org/zap" - v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/dynamic" - appsv1listers "k8s.io/client-go/listers/apps/v1" - corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/pkg/apis" + "knative.dev/pkg/kmeta" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/apis/eventing" @@ -45,7 +42,7 @@ import ( "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/broker/resources" - "knative.dev/eventing/pkg/reconciler/names" + "knative.dev/eventing/pkg/reconciler/internal/service" duckapis "knative.dev/pkg/apis/duck" pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" @@ -62,11 +59,11 @@ type Reconciler struct { // listers index properties about resources brokerLister eventinglisters.BrokerLister - serviceLister corev1listers.ServiceLister - deploymentLister appsv1listers.DeploymentLister subscriptionLister messaginglisters.SubscriptionLister triggerLister eventinglisters.TriggerLister + serviceReconciler service.Reconciler + channelableTracker duck.ListableTracker ingressImage string @@ -112,12 +109,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr return nil } } - filterSvc, err := r.reconcileKind(ctx, b) + filterSvcURL, err := r.reconcileKind(ctx, b) if b.Status.IsReady() { // So, at this point the Broker is ready and everything should be solid // for the triggers to act upon, so reconcile them. - te := r.reconcileTriggers(ctx, b, filterSvc) + te := r.reconcileTriggers(ctx, b, filterSvcURL) if te != nil { logging.FromContext(ctx).Error("Problem reconciling triggers", zap.Error(te)) return fmt.Errorf("failed to reconcile triggers: %v", te) @@ -131,7 +128,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr return err } -func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, pkgreconciler.Event) { +func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*apis.URL, pkgreconciler.Event) { logging.FromContext(ctx).Debug("Reconciling", zap.Any("Broker", b)) b.Status.InitializeConditions() b.Status.ObservedGeneration = b.Generation @@ -183,42 +180,34 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*co b.Status.TriggerChannel = &chanMan.ref b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status) - filterDeployment, err := r.reconcileFilterDeployment(ctx, b) - if err != nil { - logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err)) - b.Status.MarkFilterFailed("DeploymentFailure", "%v", err) - return nil, err - } - filterSvc, err := r.reconcileFilterService(ctx, b) + filterStatus, err := r.reconcileFilterService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err)) b.Status.MarkFilterFailed("ServiceFailure", "%v", err) return nil, err } - b.Status.PropagateFilterDeploymentAvailability(filterDeployment) - - ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan) - if err != nil { - logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err)) - b.Status.MarkIngressFailed("DeploymentFailure", "%v", err) - return nil, err + if filterStatus.IsReady { + b.Status.MarkFilterReady() + } else { + b.Status.MarkFilterFailed(filterStatus.Reason, filterStatus.Message) } - svc, err := r.reconcileIngressService(ctx, b) + ingressStatus, err := r.reconcileIngressService(ctx, b, triggerChan) if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err)) b.Status.MarkIngressFailed("ServiceFailure", "%v", err) return nil, err } - b.Status.PropagateIngressDeploymentAvailability(ingressDeployment) - b.Status.SetAddress(&apis.URL{ - Scheme: "http", - Host: names.ServiceHostName(svc.Name, svc.Namespace), - }) + if ingressStatus.IsReady { + b.Status.MarkIngressReady() + } else { + b.Status.MarkIngressFailed(ingressStatus.Reason, ingressStatus.Message) + } + b.Status.SetAddress(ingressStatus.URL) // So, at this point the Broker is ready and everything should be solid // for the triggers to act upon. - return filterSvc, nil + return filterStatus.URL, nil } type channelTemplate struct { @@ -298,20 +287,14 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, b *v1alpha1.Broker) pkgre return newReconciledNormal(b.Namespace, b.Name) } -// reconcileFilterDeployment reconciles Broker's 'b' filter deployment. -func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.Broker) (*v1.Deployment, error) { - expected := resources.MakeFilterDeployment(&resources.FilterArgs{ +// reconcileFilterService reconciles Broker's 'b' filter service. +func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*service.Status, error) { + svcArgs := resources.MakeFilterServiceArgs(&resources.FilterArgs{ Broker: b, Image: r.filterImage, ServiceAccountName: r.filterServiceAccountName, }) - return r.reconcileDeployment(ctx, expected) -} - -// reconcileFilterService reconciles Broker's 'b' filter service. -func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) { - expected := resources.MakeFilterService(b) - return r.reconcileService(ctx, expected) + return r.serviceReconciler.Reconcile(ctx, *kmeta.NewControllerRef(b), *svcArgs) } // reconcileChannel reconciles Broker's 'b' underlying channel. @@ -362,78 +345,19 @@ func TriggerChannelLabels(brokerName string) map[string]string { } } -// reconcileDeployment reconciles the K8s Deployment 'd'. -func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) (*v1.Deployment, error) { - current, err := r.deploymentLister.Deployments(d.Namespace).Get(d.Name) - if apierrs.IsNotFound(err) { - current, err = r.KubeClientSet.AppsV1().Deployments(d.Namespace).Create(d) - if err != nil { - return nil, err - } - return current, nil - } else if err != nil { - return nil, err - } - - if !equality.Semantic.DeepDerivative(d.Spec, current.Spec) { - // Don't modify the informers copy. - desired := current.DeepCopy() - desired.Spec = d.Spec - current, err = r.KubeClientSet.AppsV1().Deployments(current.Namespace).Update(desired) - if err != nil { - return nil, err - } - } - return current, nil -} - -// reconcileService reconciles the K8s Service 'svc'. -func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Service, error) { - current, err := r.serviceLister.Services(svc.Namespace).Get(svc.Name) - if apierrs.IsNotFound(err) { - current, err = r.KubeClientSet.CoreV1().Services(svc.Namespace).Create(svc) - if err != nil { - return nil, err - } - return current, nil - } else if err != nil { - return nil, err - } - - // spec.clusterIP is immutable and is set on existing services. If we don't set this to the same value, we will - // encounter an error while updating. - svc.Spec.ClusterIP = current.Spec.ClusterIP - if !equality.Semantic.DeepDerivative(svc.Spec, current.Spec) { - // Don't modify the informers copy. - desired := current.DeepCopy() - desired.Spec = svc.Spec - current, err = r.KubeClientSet.CoreV1().Services(current.Namespace).Update(desired) - if err != nil { - return nil, err - } - } - return current, nil -} - // reconcileIngressDeploymentCRD reconciles the Ingress Deployment for a CRD backed channel. -func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) (*v1.Deployment, error) { - expected := resources.MakeIngressDeployment(&resources.IngressArgs{ +func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) (*service.Status, error) { + svcArgs := resources.MakeIngressServiceArgs(&resources.IngressArgs{ Broker: b, Image: r.ingressImage, ServiceAccountName: r.ingressServiceAccountName, ChannelAddress: c.Status.Address.GetURL().Host, }) - return r.reconcileDeployment(ctx, expected) -} - -// reconcileIngressService reconciles the Ingress Service. -func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) { - expected := resources.MakeIngressService(b) - return r.reconcileService(ctx, expected) + return r.serviceReconciler.Reconcile(ctx, *kmeta.NewControllerRef(b), *svcArgs) } // reconcileTriggers reconciles the Triggers that are pointed to this broker -func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvc *corev1.Service) error { +func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvcURL *apis.URL) error { // TODO: Figure out the labels stuff... If webhook does it, we can filter like this... // Find all the Triggers that have been labeled as belonging to me @@ -447,7 +371,7 @@ func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, for _, t := range triggers { if t.Spec.Broker == b.Name { trigger := t.DeepCopy() - tErr := r.reconcileTrigger(ctx, b, trigger, filterSvc) + tErr := r.reconcileTrigger(ctx, b, trigger, filterSvcURL) if tErr != nil { r.Logger.Error("Reconciling trigger failed:", zap.String("name", t.Name), zap.Error(err)) r.Recorder.Eventf(trigger, corev1.EventTypeWarning, triggerReconcileFailed, "Trigger reconcile failed: %v", tErr) diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 0f7ef5551e7..f56d1fd3a80 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -40,6 +40,7 @@ import ( "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/broker/resources" + kubeservice "knative.dev/eventing/pkg/reconciler/internal/service/kube" "knative.dev/eventing/pkg/utils" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -49,6 +50,7 @@ import ( "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" v1a1addr "knative.dev/pkg/client/injection/ducks/duck/v1alpha1/addressable" v1b1addr "knative.dev/pkg/client/injection/ducks/duck/v1beta1/addressable" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" logtesting "knative.dev/pkg/logging/testing" @@ -309,11 +311,11 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithTriggerChannelReady(), WithBrokerTriggerChannel(createTriggerChannelRef()), - WithFilterFailed("DeploymentFailure", "inducing failure for create deployments")), + WithFilterFailed("ServiceFailure", "failed to create deployment: inducing failure for create deployments")), }}, WantEvents: []string{ finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for create deployments"), + Eventf(corev1.EventTypeWarning, "InternalError", "failed to create deployment: inducing failure for create deployments"), }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, brokerName), @@ -342,7 +344,7 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithTriggerChannelReady(), WithBrokerTriggerChannel(createTriggerChannelRef()), - WithFilterFailed("DeploymentFailure", "inducing failure for update deployments")), + WithFilterFailed("ServiceFailure", "failed to update deployment: inducing failure for update deployments")), }}, WantUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewDeployment(filterDeploymentName, testNS, @@ -353,7 +355,7 @@ func TestReconcile(t *testing.T) { }}, WantEvents: []string{ finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for update deployments"), + Eventf(corev1.EventTypeWarning, "InternalError", "failed to update deployment: inducing failure for update deployments"), }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, brokerName), @@ -388,11 +390,11 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithTriggerChannelReady(), WithBrokerTriggerChannel(createTriggerChannelRef()), - WithFilterFailed("ServiceFailure", "inducing failure for create services")), + WithFilterFailed("ServiceFailure", "failed to create service: inducing failure for create services")), }}, WantEvents: []string{ finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for create services"), + Eventf(corev1.EventTypeWarning, "InternalError", "failed to create service: inducing failure for create services"), }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, brokerName), @@ -431,11 +433,11 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithTriggerChannelReady(), WithBrokerTriggerChannel(createTriggerChannelRef()), - WithFilterFailed("ServiceFailure", "inducing failure for update services")), + WithFilterFailed("ServiceFailure", "failed to update service: inducing failure for update services")), }}, WantEvents: []string{ finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for update services"), + Eventf(corev1.EventTypeWarning, "InternalError", "failed to update service: inducing failure for update services"), }, WantErr: true, WantPatches: []clientgotesting.PatchActionImpl{ @@ -477,11 +479,11 @@ func TestReconcile(t *testing.T) { WithTriggerChannelReady(), WithFilterDeploymentAvailable(), WithBrokerTriggerChannel(createTriggerChannelRef()), - WithIngressFailed("DeploymentFailure", "inducing failure for create deployments")), + WithIngressFailed("ServiceFailure", "failed to create deployment: inducing failure for create deployments")), }}, WantEvents: []string{ finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for create deployments"), + Eventf(corev1.EventTypeWarning, "InternalError", "failed to create deployment: inducing failure for create deployments"), }, WantErr: true, WantPatches: []clientgotesting.PatchActionImpl{ @@ -531,11 +533,11 @@ func TestReconcile(t *testing.T) { WithTriggerChannelReady(), WithFilterDeploymentAvailable(), WithBrokerTriggerChannel(createTriggerChannelRef()), - WithIngressFailed("DeploymentFailure", "inducing failure for update deployments")), + WithIngressFailed("ServiceFailure", "failed to update deployment: inducing failure for update deployments")), }}, WantEvents: []string{ finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for update deployments"), + Eventf(corev1.EventTypeWarning, "InternalError", "failed to update deployment: inducing failure for update deployments"), }, WantErr: true, WantPatches: []clientgotesting.PatchActionImpl{ @@ -580,11 +582,11 @@ func TestReconcile(t *testing.T) { WithTriggerChannelReady(), WithFilterDeploymentAvailable(), WithBrokerTriggerChannel(createTriggerChannelRef()), - WithIngressFailed("ServiceFailure", "inducing failure for create services")), + WithIngressFailed("ServiceFailure", "failed to create service: inducing failure for create services")), }}, WantEvents: []string{ finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for create services"), + Eventf(corev1.EventTypeWarning, "InternalError", "failed to create service: inducing failure for create services"), }, WantErr: true, WantPatches: []clientgotesting.PatchActionImpl{ @@ -633,11 +635,11 @@ func TestReconcile(t *testing.T) { WithTriggerChannelReady(), WithFilterDeploymentAvailable(), WithBrokerTriggerChannel(createTriggerChannelRef()), - WithIngressFailed("ServiceFailure", "inducing failure for update services")), + WithIngressFailed("ServiceFailure", "failed to update service: inducing failure for update services")), }}, WantEvents: []string{ finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for update services"), + Eventf(corev1.EventTypeWarning, "InternalError", "failed to update service: inducing failure for update services"), }, WantErr: true, WantPatches: []clientgotesting.PatchActionImpl{ @@ -1392,12 +1394,15 @@ func TestReconcile(t *testing.T) { ctx = v1addr.WithDuck(ctx) ctx = conditions.WithDuck(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + serviceReconciler: &kubeservice.ServiceReconciler{ + KubeClientSet: kubeclient.Get(ctx), + DeploymentLister: listers.GetDeploymentLister(), + ServiceLister: listers.GetServiceLister(), + }, subscriptionLister: listers.GetSubscriptionLister(), triggerLister: listers.GetTriggerLister(), brokerLister: listers.GetBrokerLister(), - serviceLister: listers.GetK8sServiceLister(), - deploymentLister: listers.GetDeploymentLister(), filterImage: filterImage, filterServiceAccountName: filterSA, ingressImage: ingressImage, @@ -1442,6 +1447,9 @@ func livenessProbe() *corev1.Probe { }, InitialDelaySeconds: 5, PeriodSeconds: 2, + TimeoutSeconds: 10, + FailureThreshold: 3, + SuccessThreshold: 1, } } @@ -1455,6 +1463,9 @@ func readinessProbe() *corev1.Probe { }, InitialDelaySeconds: 5, PeriodSeconds: 2, + TimeoutSeconds: 10, + FailureThreshold: 3, + SuccessThreshold: 1, } } @@ -1467,20 +1478,12 @@ func envVars(containerName string) []corev1.EnvVar { Value: system.Namespace(), }, { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, + Name: "NAMESPACE", + Value: testNS, }, { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, + Name: "POD_NAME", + Value: brokerName + "-broker-filter", }, { Name: "CONTAINER_NAME", @@ -1502,20 +1505,12 @@ func envVars(containerName string) []corev1.EnvVar { Value: system.Namespace(), }, { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, + Name: "NAMESPACE", + Value: testNS, }, { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, + Name: "POD_NAME", + Value: brokerName + "-broker-ingress", }, { Name: "CONTAINER_NAME", @@ -1548,10 +1543,6 @@ func containerPorts(httpInternal int32) []corev1.ContainerPort { Name: "http", ContainerPort: httpInternal, }, - { - Name: "metrics", - ContainerPort: 9090, - }, } } @@ -1561,9 +1552,6 @@ func servicePorts(httpInternal int) []corev1.ServicePort { Name: "http", Port: 80, TargetPort: intstr.FromInt(httpInternal), - }, { - Name: "http-metrics", - Port: 9090, }, } return svcPorts @@ -1693,7 +1681,7 @@ func makeBrokerRef() *corev1.ObjectReference { func makeServiceURI() *apis.URL { return &apis.URL{ Scheme: "http", - Host: fmt.Sprintf("%s.%s.svc.%s", makeBrokerFilterService().Name, testNS, utils.GetClusterDomainName()), + Host: fmt.Sprintf("%s.%s.svc.%s", resources.MakeFilterServiceMeta(makeBroker()).Name, testNS, utils.GetClusterDomainName()), Path: fmt.Sprintf("/triggers/%s/%s/%s", testNS, triggerName, triggerUID), } } @@ -1701,7 +1689,11 @@ func makeEmptyDelivery() *eventingduckv1alpha1.DeliverySpec { return nil } func makeBrokerFilterService() *corev1.Service { - return resources.MakeFilterService(makeBroker()) + args := resources.MakeFilterServiceArgs(&resources.FilterArgs{ + Broker: makeBroker(), + Image: "test-image", + }) + return NewService(args.ServiceMeta.Name, args.ServiceMeta.Namespace) } func makeBroker() *v1alpha1.Broker { diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index 8cc8f49fecc..6b019f81010 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -34,8 +34,10 @@ import ( brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1alpha1/broker" "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler" + kubeservice "knative.dev/eventing/pkg/reconciler/internal/service/kube" "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" + kubeclient "knative.dev/pkg/client/injection/kube/client" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" "knative.dev/pkg/configmap" @@ -79,10 +81,13 @@ func NewController( serviceInformer := serviceinformer.Get(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + serviceReconciler: &kubeservice.ServiceReconciler{ + KubeClientSet: kubeclient.Get(ctx), + DeploymentLister: deploymentInformer.Lister(), + ServiceLister: serviceInformer.Lister(), + }, brokerLister: brokerInformer.Lister(), - serviceLister: serviceInformer.Lister(), - deploymentLister: deploymentInformer.Lister(), subscriptionLister: subscriptionInformer.Lister(), triggerLister: triggerInformer.Lister(), ingressImage: env.IngressImage, diff --git a/pkg/reconciler/broker/resources/filter.go b/pkg/reconciler/broker/resources/filter.go index f595377807b..975ccce2deb 100644 --- a/pkg/reconciler/broker/resources/filter.go +++ b/pkg/reconciler/broker/resources/filter.go @@ -19,14 +19,12 @@ package resources import ( "fmt" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" "knative.dev/eventing/pkg/apis/eventing" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "knative.dev/pkg/kmeta" + "knative.dev/eventing/pkg/reconciler/internal/service" "knative.dev/pkg/system" ) @@ -41,126 +39,83 @@ type FilterArgs struct { ServiceAccountName string } -// MakeFilterDeployment creates the in-memory representation of the Broker's filter Deployment. -func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment { - return &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: args.Broker.Namespace, - Name: fmt.Sprintf("%s-broker-filter", args.Broker.Name), - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(args.Broker), - }, - Labels: FilterLabels(args.Broker.Name), - }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: FilterLabels(args.Broker.Name), - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: FilterLabels(args.Broker.Name), - }, - Spec: corev1.PodSpec{ - ServiceAccountName: args.ServiceAccountName, - Containers: []corev1.Container{ - { - Name: filterContainerName, - Image: args.Image, - LivenessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, - }, - }, - InitialDelaySeconds: 5, - PeriodSeconds: 2, - }, - ReadinessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/readyz", - Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, - }, - }, - InitialDelaySeconds: 5, - PeriodSeconds: 2, - }, - Env: []corev1.EnvVar{ - { - Name: system.NamespaceEnvKey, - Value: system.Namespace(), - }, - { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "CONTAINER_NAME", - Value: filterContainerName, - }, - { - Name: "BROKER", - Value: args.Broker.Name, - }, - // Used for StackDriver only. - { - Name: "METRICS_DOMAIN", - Value: "knative.dev/internal/eventing", - }, - }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8080, - Name: "http", - }, - { - ContainerPort: 9090, - Name: "metrics", - }, - }, - }, - }, - }, - }, - }, +func MakeFilterServiceMeta(b *eventingv1alpha1.Broker) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Namespace: b.Namespace, + Name: fmt.Sprintf("%s-broker-filter", b.Name), + Labels: FilterLabels(b.Name), } } -// MakeFilterService creates the in-memory representation of the Broker's filter Service. -func MakeFilterService(b *eventingv1alpha1.Broker) *corev1.Service { - return &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: b.Namespace, - Name: fmt.Sprintf("%s-broker-filter", b.Name), - Labels: FilterLabels(b.Name), - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(b), - }, - }, - Spec: corev1.ServiceSpec{ - Selector: FilterLabels(b.Name), - Ports: []corev1.ServicePort{ - { - Name: "http", - Port: 80, - TargetPort: intstr.FromInt(8080), - }, +// MakeFilterServiceArgs creates the in-memory representation of the Broker's filter service arguments. +func MakeFilterServiceArgs(args *FilterArgs) *service.Args { + return &service.Args{ + ServiceMeta: MakeFilterServiceMeta(args.Broker), + DeployMeta: MakeFilterServiceMeta(args.Broker), + PodSpec: corev1.PodSpec{ + ServiceAccountName: args.ServiceAccountName, + Containers: []corev1.Container{ { - Name: "http-metrics", - Port: 9090, + Name: filterContainerName, + Image: args.Image, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + // Port should be the same as the container port. + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + FailureThreshold: 3, + TimeoutSeconds: 10, + SuccessThreshold: 1, + }, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/readyz", + // Port should be the same as the container port. + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + FailureThreshold: 3, + TimeoutSeconds: 10, + SuccessThreshold: 1, + }, + Env: []corev1.EnvVar{ + { + Name: system.NamespaceEnvKey, + Value: system.Namespace(), + }, + { + Name: "NAMESPACE", + Value: args.Broker.Namespace, + }, + { + Name: "POD_NAME", + Value: fmt.Sprintf("%s-broker-filter", args.Broker.Name), + }, + { + Name: "CONTAINER_NAME", + Value: filterContainerName, + }, + { + Name: "BROKER", + Value: args.Broker.Name, + }, + // Used for StackDriver only. + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/internal/eventing", + }, + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8080, + }, + }, }, }, }, diff --git a/pkg/reconciler/broker/resources/filter_test.go b/pkg/reconciler/broker/resources/filter_test.go index c0169ee2871..037ed35b989 100644 --- a/pkg/reconciler/broker/resources/filter_test.go +++ b/pkg/reconciler/broker/resources/filter_test.go @@ -17,261 +17,114 @@ limitations under the License. package resources import ( - "encoding/json" + "os" "testing" "github.com/google/go-cmp/cmp" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/reconciler/internal/service" _ "knative.dev/pkg/system/testing" ) -func TestMakeFilterDeployment(t *testing.T) { - testCases := map[string]struct { - args FilterArgs - want []byte - }{ - "happy": { - args: FilterArgs{ - Broker: &v1alpha1.Broker{ - ObjectMeta: v1.ObjectMeta{ - Name: "happy", - }, - Spec: v1alpha1.BrokerSpec{}, - Status: v1alpha1.BrokerStatus{}, - }, - Image: "image-uri", - ServiceAccountName: "service-account-name", - }, - want: []byte(`{ - "metadata": { - "name": "happy-broker-filter", - "creationTimestamp": null, - "labels": { - "eventing.knative.dev/broker": "happy", - "eventing.knative.dev/brokerRole": "filter" - }, - "ownerReferences": [ - { - "apiVersion": "eventing.knative.dev/v1alpha1", - "kind": "Broker", - "name": "happy", - "uid": "", - "controller": true, - "blockOwnerDeletion": true - } - ] - }, - "spec": { - "selector": { - "matchLabels": { - "eventing.knative.dev/broker": "happy", - "eventing.knative.dev/brokerRole": "filter" - } - }, - "template": { - "metadata": { - "creationTimestamp": null, - "labels": { - "eventing.knative.dev/broker": "happy", - "eventing.knative.dev/brokerRole": "filter" - } - }, - "spec": { - "containers": [ - { - "name": "filter", - "image": "image-uri", - "ports": [ - { - "name": "http", - "containerPort": 8080 - }, - { - "name": "metrics", - "containerPort": 9090 - } - ], - "env": [ - { - "name": "SYSTEM_NAMESPACE", - "value": "knative-testing" - }, - { - "name": "NAMESPACE", - "valueFrom": { - "fieldRef": { - "fieldPath": "metadata.namespace" - } - } - }, - { - "name": "POD_NAME", - "valueFrom": { - "fieldRef": { - "fieldPath": "metadata.name" - } - } - }, - { - "name": "CONTAINER_NAME", - "value": "filter" - }, - { - "name": "BROKER", - "value": "happy" - }, - { - "name": "METRICS_DOMAIN", - "value": "knative.dev/internal/eventing" - } - ], - "resources": {}, - "livenessProbe": { - "httpGet": { - "path": "/healthz", - "port": 8080 - }, - "initialDelaySeconds": 5, - "periodSeconds": 2 - }, - "readinessProbe": { - "httpGet": { - "path": "/readyz", - "port": 8080 - }, - "initialDelaySeconds": 5, - "periodSeconds": 2 - } - } - ], - "serviceAccountName": "service-account-name" - } - }, - "strategy": {} - }, - "status": {} -}`), +func TestMakeFilterServiceArgs(t *testing.T) { + _ = os.Setenv("SYSTEM_NAMESPACE", "my-system-ns") + b := &eventingv1alpha1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + Namespace: "my-ns", }, } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - dep := MakeFilterDeployment(&tc.args) - - got, err := json.MarshalIndent(dep, "", " ") - if err != nil { - t.Errorf("failed to marshal deployment, %s", err) - } - - if diff := cmp.Diff(tc.want, got); diff != "" { - t.Log(string(got)) - t.Errorf("unexpected deployment (-want, +got) = %v", diff) - } - }) - } -} - -func TestMakeFilterService(t *testing.T) { - testCases := map[string]struct { - broker v1alpha1.Broker - want []byte - }{ - "happy": { - broker: v1alpha1.Broker{ - ObjectMeta: v1.ObjectMeta{ - Name: "happy", - }, - Spec: v1alpha1.BrokerSpec{}, - Status: v1alpha1.BrokerStatus{}, + want := &service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Namespace: "my-ns", + Name: "default-broker-filter", + Labels: map[string]string{ + "eventing.knative.dev/broker": "default", + "eventing.knative.dev/brokerRole": "filter", }, - want: []byte(`{ - "metadata": { - "name": "happy-broker-filter", - "creationTimestamp": null, - "labels": { - "eventing.knative.dev/broker": "happy", - "eventing.knative.dev/brokerRole": "filter" - }, - "ownerReferences": [ - { - "apiVersion": "eventing.knative.dev/v1alpha1", - "kind": "Broker", - "name": "happy", - "uid": "", - "controller": true, - "blockOwnerDeletion": true - } - ] - }, - "spec": { - "ports": [ - { - "name": "http", - "port": 80, - "targetPort": 8080 - }, - { - "name": "http-metrics", - "port": 9090, - "targetPort": 0 - } - ], - "selector": { - "eventing.knative.dev/broker": "happy", - "eventing.knative.dev/brokerRole": "filter" - } - }, - "status": { - "loadBalancer": {} - } -}`), }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - dep := MakeFilterService(&tc.broker) - - got, err := json.MarshalIndent(dep, "", " ") - if err != nil { - t.Errorf("failed to marshal deployment, %s", err) - } - - if diff := cmp.Diff(tc.want, got); diff != "" { - t.Log(string(got)) - t.Errorf("unexpected deployment (-want, +got) = %v", diff) - } - }) - } -} - -func TestMakeFilterLabels(t *testing.T) { - testCases := map[string]struct { - name string - want map[string]string - }{ - "with name": { - name: "brokerName", - want: map[string]string{ - "eventing.knative.dev/broker": "brokerName", + DeployMeta: metav1.ObjectMeta{ + Namespace: "my-ns", + Name: "default-broker-filter", + Labels: map[string]string{ + "eventing.knative.dev/broker": "default", "eventing.knative.dev/brokerRole": "filter", }, }, - "name empty": { - name: "", - want: map[string]string{ - "eventing.knative.dev/broker": "", - "eventing.knative.dev/brokerRole": "filter", + PodSpec: corev1.PodSpec{ + ServiceAccountName: "my-serviceaccount", + Containers: []corev1.Container{ + { + Name: "filter", + Image: "image.example.com/filter", + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + FailureThreshold: 3, + TimeoutSeconds: 10, + SuccessThreshold: 1, + }, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/readyz", + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + FailureThreshold: 3, + TimeoutSeconds: 10, + SuccessThreshold: 1, + }, + Env: []corev1.EnvVar{ + { + Name: "SYSTEM_NAMESPACE", + Value: "my-system-ns", + }, + { + Name: "NAMESPACE", + Value: "my-ns", + }, + { + Name: "POD_NAME", + Value: "default-broker-filter", + }, + { + Name: "CONTAINER_NAME", + Value: "filter", + }, + { + Name: "BROKER", + Value: "default", + }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/internal/eventing", + }, + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8080, + }, + }, + }, }, }, } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - got := FilterLabels(tc.name) - - if diff := cmp.Diff(tc.want, got); diff != "" { - t.Errorf("unexpected labels (-want, +got) = %v", diff) - } - }) + got := MakeFilterServiceArgs(&FilterArgs{ + Broker: b, + Image: "image.example.com/filter", + ServiceAccountName: "my-serviceaccount", + }) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Unexpected filter service arguments (-want, +got): %s", diff) } } diff --git a/pkg/reconciler/broker/resources/ingress.go b/pkg/reconciler/broker/resources/ingress.go index 5914a94d35f..c9e3ee2e752 100644 --- a/pkg/reconciler/broker/resources/ingress.go +++ b/pkg/reconciler/broker/resources/ingress.go @@ -19,14 +19,12 @@ package resources import ( "fmt" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" "knative.dev/eventing/pkg/apis/eventing" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "knative.dev/pkg/kmeta" + "knative.dev/eventing/pkg/reconciler/internal/service" "knative.dev/pkg/system" ) @@ -42,125 +40,78 @@ type IngressArgs struct { ChannelAddress string } -// MakeIngress creates the in-memory representation of the Broker's ingress Deployment. -func MakeIngressDeployment(args *IngressArgs) *appsv1.Deployment { - return &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ +// MakeIngressServiceArgs creates the in-memory representation of the Broker's ingress service arguments. +func MakeIngressServiceArgs(args *IngressArgs) *service.Args { + return &service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Namespace: args.Broker.Namespace, + Name: fmt.Sprintf("%s-broker", args.Broker.Name), + Labels: IngressLabels(args.Broker.Name), + }, + DeployMeta: metav1.ObjectMeta{ Namespace: args.Broker.Namespace, Name: fmt.Sprintf("%s-broker-ingress", args.Broker.Name), - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(args.Broker), - }, - Labels: IngressLabels(args.Broker.Name), + Labels: IngressLabels(args.Broker.Name), }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: IngressLabels(args.Broker.Name), - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: IngressLabels(args.Broker.Name), - }, - Spec: corev1.PodSpec{ - ServiceAccountName: args.ServiceAccountName, - Containers: []corev1.Container{ - { - Image: args.Image, - Name: ingressContainerName, - LivenessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, - }, - }, - InitialDelaySeconds: 5, - PeriodSeconds: 2, - }, - Env: []corev1.EnvVar{ - { - Name: system.NamespaceEnvKey, - Value: system.Namespace(), - }, - { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "CONTAINER_NAME", - Value: ingressContainerName, - }, - { - Name: "FILTER", - Value: "", // TODO Add one. - }, - { - Name: "CHANNEL", - Value: args.ChannelAddress, - }, - { - Name: "BROKER", - Value: args.Broker.Name, - }, - // Used for StackDriver only. - { - Name: "METRICS_DOMAIN", - Value: "knative.dev/internal/eventing", - }, - }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8080, - Name: "http", - }, - { - ContainerPort: 9090, - Name: "metrics", - }, + PodSpec: corev1.PodSpec{ + ServiceAccountName: args.ServiceAccountName, + Containers: []corev1.Container{ + { + Image: args.Image, + Name: ingressContainerName, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + // Port should be the same as the container port. }, }, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + FailureThreshold: 3, + TimeoutSeconds: 10, + SuccessThreshold: 1, + }, + Env: []corev1.EnvVar{ + { + Name: system.NamespaceEnvKey, + Value: system.Namespace(), + }, + { + Name: "NAMESPACE", + Value: args.Broker.Namespace, + }, + { + Name: "POD_NAME", + Value: fmt.Sprintf("%s-broker-ingress", args.Broker.Name), + }, + { + Name: "CONTAINER_NAME", + Value: ingressContainerName, + }, + { + Name: "FILTER", + Value: "", // TODO Add one. + }, + { + Name: "CHANNEL", + Value: args.ChannelAddress, + }, + { + Name: "BROKER", + Value: args.Broker.Name, + }, + // Used for StackDriver only. + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/internal/eventing", + }, + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8080, + }, }, - }, - }, - }, - } -} - -// MakeIngressService creates the in-memory representation of the Broker's ingress Service. -func MakeIngressService(b *eventingv1alpha1.Broker) *corev1.Service { - return &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: b.Namespace, - // TODO add -ingress to the name to be consistent with the filter service naming. - Name: fmt.Sprintf("%s-broker", b.Name), - Labels: IngressLabels(b.Name), - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(b), - }, - }, - Spec: corev1.ServiceSpec{ - Selector: IngressLabels(b.Name), - Ports: []corev1.ServicePort{ - { - Name: "http", - Port: 80, - TargetPort: intstr.FromInt(8080), - }, - { - Name: "http-metrics", - Port: 9090, }, }, }, diff --git a/pkg/reconciler/broker/resources/ingress_test.go b/pkg/reconciler/broker/resources/ingress_test.go new file mode 100644 index 00000000000..0559e854361 --- /dev/null +++ b/pkg/reconciler/broker/resources/ingress_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2020 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "os" + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/reconciler/internal/service" +) + +func TestMakeIngressServiceArgs(t *testing.T) { + _ = os.Setenv("SYSTEM_NAMESPACE", "my-system-ns") + b := &eventingv1alpha1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + Namespace: "my-ns", + }, + } + + want := &service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Namespace: "my-ns", + Name: "default-broker", + Labels: map[string]string{ + "eventing.knative.dev/broker": "default", + "eventing.knative.dev/brokerRole": "ingress", + }, + }, + DeployMeta: metav1.ObjectMeta{ + Namespace: "my-ns", + Name: "default-broker-ingress", + Labels: map[string]string{ + "eventing.knative.dev/broker": "default", + "eventing.knative.dev/brokerRole": "ingress", + }, + }, + PodSpec: corev1.PodSpec{ + ServiceAccountName: "my-serviceaccount", + Containers: []corev1.Container{ + { + Name: "ingress", + Image: "image.example.com/ingress", + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + FailureThreshold: 3, + TimeoutSeconds: 10, + SuccessThreshold: 1, + }, + Env: []corev1.EnvVar{ + { + Name: "SYSTEM_NAMESPACE", + Value: "my-system-ns", + }, + { + Name: "NAMESPACE", + Value: "my-ns", + }, + { + Name: "POD_NAME", + Value: "default-broker-ingress", + }, + { + Name: "CONTAINER_NAME", + Value: "ingress", + }, + { + Name: "FILTER", + Value: "", + }, + { + Name: "CHANNEL", + Value: "chan.example.com", + }, + { + Name: "BROKER", + Value: "default", + }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/internal/eventing", + }, + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8080, + }, + }, + }, + }, + }, + } + + got := MakeIngressServiceArgs(&IngressArgs{ + Broker: b, + Image: "image.example.com/ingress", + ServiceAccountName: "my-serviceaccount", + ChannelAddress: "chan.example.com", + }) + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Unexpected filter service arguments (-want, +got): %s", diff) + } +} diff --git a/pkg/reconciler/broker/trigger.go b/pkg/reconciler/broker/trigger.go index 1e10abe6e7e..fbd5e19289e 100644 --- a/pkg/reconciler/broker/trigger.go +++ b/pkg/reconciler/broker/trigger.go @@ -33,7 +33,6 @@ import ( messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1" "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler/broker/resources" - "knative.dev/eventing/pkg/reconciler/names" "knative.dev/eventing/pkg/reconciler/trigger/path" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -52,7 +51,7 @@ const ( triggerServiceFailed = "TriggerServiceFailed" ) -func (r *Reconciler) reconcileTrigger(ctx context.Context, b *v1alpha1.Broker, t *v1alpha1.Trigger, filterSvc *corev1.Service) error { +func (r *Reconciler) reconcileTrigger(ctx context.Context, b *v1alpha1.Broker, t *v1alpha1.Trigger, filterSvcURL *apis.URL) error { t.Status.InitializeConditions() if t.DeletionTimestamp != nil { @@ -84,7 +83,7 @@ func (r *Reconciler) reconcileTrigger(ctx context.Context, b *v1alpha1.Broker, t t.Status.SubscriberURI = subscriberURI t.Status.MarkSubscriberResolvedSucceeded() - sub, err := r.subscribeToBrokerChannel(ctx, b, t, brokerTrigger, filterSvc) + sub, err := r.subscribeToBrokerChannel(ctx, b, t, brokerTrigger, filterSvcURL) if err != nil { logging.FromContext(ctx).Error("Unable to Subscribe", zap.Error(err)) t.Status.MarkNotSubscribed("NotSubscribed", "%v", err) @@ -100,13 +99,14 @@ func (r *Reconciler) reconcileTrigger(ctx context.Context, b *v1alpha1.Broker, t } // subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels. -func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *v1alpha1.Broker, t *v1alpha1.Trigger, brokerTrigger *corev1.ObjectReference, svc *corev1.Service) (*messagingv1alpha1.Subscription, error) { - if svc == nil { - return nil, fmt.Errorf("service for broker is nil") +func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *v1alpha1.Broker, t *v1alpha1.Trigger, brokerTrigger *corev1.ObjectReference, filterSvcURL *apis.URL) (*messagingv1alpha1.Subscription, error) { + if filterSvcURL == nil { + return nil, fmt.Errorf("service URL for broker is nil") } + uri := &apis.URL{ - Scheme: "http", - Host: names.ServiceHostName(svc.Name, svc.Namespace), + Scheme: filterSvcURL.Scheme, + Host: filterSvcURL.Host, Path: path.Generate(t), } // Note that we have to hard code the brokerGKV stuff as sometimes typemeta is not diff --git a/pkg/reconciler/internal/service/base.go b/pkg/reconciler/internal/service/base.go new file mode 100644 index 00000000000..8c1454fb55f --- /dev/null +++ b/pkg/reconciler/internal/service/base.go @@ -0,0 +1,45 @@ +/* +Copyright 2020 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/apis" +) + +// Status represents the status of an addressable service. +type Status struct { + IsReady bool + URL *apis.URL + Reason string + Message string +} + +// Args is the arguments to reconcile an addressable service. +type Args struct { + ServiceMeta metav1.ObjectMeta + DeployMeta metav1.ObjectMeta + PodSpec corev1.PodSpec +} + +// Reconciler is the interface to reconcile addressable services. +type Reconciler interface { + // Reconcile reconciles a service. + Reconcile(context.Context, metav1.OwnerReference, Args) (*Status, error) + // GetStatus get the status of a service. + GetStatus(context.Context, metav1.ObjectMeta) (*Status, error) +} diff --git a/pkg/reconciler/internal/service/kube/kube.go b/pkg/reconciler/internal/service/kube/kube.go new file mode 100644 index 00000000000..fb8475727dd --- /dev/null +++ b/pkg/reconciler/internal/service/kube/kube.go @@ -0,0 +1,209 @@ +/* +Copyright 2020 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + "fmt" + "strings" + + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + appsv1listers "k8s.io/client-go/listers/apps/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + "knative.dev/eventing/pkg/apis/duck" + "knative.dev/eventing/pkg/reconciler/internal/service" + "knative.dev/eventing/pkg/reconciler/names" + "knative.dev/pkg/apis" +) + +// ServiceReconciler reconciles addressable services implemented with +// k8s services and deployments. +type ServiceReconciler struct { + KubeClientSet kubernetes.Interface + ServiceLister corev1listers.ServiceLister + DeploymentLister appsv1listers.DeploymentLister +} + +var _ service.Reconciler = (*ServiceReconciler)(nil) + +// Reconcile reconciles an addressable service with k8s service and deployment. +func (r *ServiceReconciler) Reconcile(ctx context.Context, owner metav1.OwnerReference, args service.Args) (*service.Status, error) { + if err := validateArgs(args); err != nil { + return nil, err + } + fillDefaults(&args, owner) + + d := &v1.Deployment{ + ObjectMeta: args.DeployMeta, + Spec: v1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: args.DeployMeta.Labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: args.DeployMeta.Labels, + }, + Spec: args.PodSpec, + }, + }, + } + rd, err := r.reconcileDeployment(ctx, d) + if err != nil { + return nil, err + } + + svc := &corev1.Service{ + ObjectMeta: args.ServiceMeta, + Spec: corev1.ServiceSpec{ + Selector: args.DeployMeta.Labels, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(int(args.PodSpec.Containers[0].Ports[0].ContainerPort)), + }, + }, + }, + } + rsvc, err := r.reconcileService(ctx, svc) + if err != nil { + return nil, err + } + + if duck.DeploymentIsAvailable(&rd.Status, true) { + return &service.Status{ + IsReady: true, + URL: &apis.URL{ + Scheme: "http", + Host: names.ServiceHostName(rsvc.Name, rsvc.Namespace), + }, + }, nil + } + + return &service.Status{ + IsReady: false, + Reason: "DeploymentUnavailable", + Message: fmt.Sprintf("The Deployment %q is unavailable", rd.Name), + URL: &apis.URL{ + Scheme: "http", + Host: names.ServiceHostName(svc.Name, svc.Namespace), + }, + }, nil +} + +func validateArgs(args service.Args) error { + if !strings.HasPrefix(args.DeployMeta.Name, args.ServiceMeta.Name) { + return fmt.Errorf("service name must be a prefix of deployment name") + } + if len(args.PodSpec.Containers[0].Ports) == 0 { + args.PodSpec.Containers[0].Ports = []corev1.ContainerPort{ + { + ContainerPort: 8080, + }, + } + } + return nil +} + +// GetStatus returns the status of a k8s service. +func (r *ServiceReconciler) GetStatus(ctx context.Context, svcMeta metav1.ObjectMeta) (*service.Status, error) { + existing, err := r.ServiceLister.Services(svcMeta.Namespace).Get(svcMeta.Name) + if err != nil { + return nil, err + } + return &service.Status{ + IsReady: true, + URL: &apis.URL{ + Scheme: "http", + Host: names.ServiceHostName(existing.Name, existing.Namespace), + }, + }, nil +} + +func fillDefaults(args *service.Args, owner metav1.OwnerReference) { + // Make sure the service metadata has proper owner reference. + args.ServiceMeta.OwnerReferences = append(args.ServiceMeta.OwnerReferences, owner) + args.DeployMeta.OwnerReferences = append(args.DeployMeta.OwnerReferences, owner) + args.PodSpec.Containers[0].Ports[0].Name = "http" + userPort := args.PodSpec.Containers[0].Ports[0].ContainerPort + + if args.PodSpec.Containers[0].LivenessProbe != nil { + if args.PodSpec.Containers[0].LivenessProbe.HTTPGet != nil { + args.PodSpec.Containers[0].LivenessProbe.HTTPGet.Port = intstr.FromInt(int(userPort)) + } + } + if args.PodSpec.Containers[0].ReadinessProbe != nil { + if args.PodSpec.Containers[0].ReadinessProbe.HTTPGet != nil { + args.PodSpec.Containers[0].ReadinessProbe.HTTPGet.Port = intstr.FromInt(int(userPort)) + } + } +} + +func (r *ServiceReconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) (*v1.Deployment, error) { + existing, err := r.DeploymentLister.Deployments(d.Namespace).Get(d.Name) + if apierrors.IsNotFound(err) { + existing, err = r.KubeClientSet.AppsV1().Deployments(d.Namespace).Create(d) + if err != nil { + return nil, fmt.Errorf("failed to create deployment: %w", err) + } + return existing, nil + } else if err != nil { + return nil, fmt.Errorf("failed to get existing deployment: %w", err) + } + + if !equality.Semantic.DeepDerivative(d.Spec, existing.Spec) { + // Don't modify the informers copy. + desired := existing.DeepCopy() + desired.Spec = d.Spec + existing, err = r.KubeClientSet.AppsV1().Deployments(existing.Namespace).Update(desired) + if err != nil { + return nil, fmt.Errorf("failed to update deployment: %w", err) + } + } + return existing, nil +} + +// reconcileService reconciles the K8s Service 'svc'. +func (r *ServiceReconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Service, error) { + existing, err := r.ServiceLister.Services(svc.Namespace).Get(svc.Name) + if apierrors.IsNotFound(err) { + existing, err = r.KubeClientSet.CoreV1().Services(svc.Namespace).Create(svc) + if err != nil { + return nil, fmt.Errorf("failed to create service: %w", err) + } + return existing, nil + } else if err != nil { + return nil, fmt.Errorf("failed to get existing service: %w", err) + } + + // spec.clusterIP is immutable and is set on existing services. If we don't set this to the same value, we will + // encounter an error while updating. + svc.Spec.ClusterIP = existing.Spec.ClusterIP + if !equality.Semantic.DeepDerivative(svc.Spec, existing.Spec) { + // Don't modify the informers copy. + desired := existing.DeepCopy() + desired.Spec = svc.Spec + existing, err = r.KubeClientSet.CoreV1().Services(existing.Namespace).Update(desired) + if err != nil { + return nil, fmt.Errorf("failed to update service: %w", err) + } + } + return existing, nil +} diff --git a/pkg/reconciler/internal/service/kube/kube_test.go b/pkg/reconciler/internal/service/kube/kube_test.go new file mode 100644 index 00000000000..a55ed4e85c8 --- /dev/null +++ b/pkg/reconciler/internal/service/kube/kube_test.go @@ -0,0 +1,563 @@ +/* +Copyright 2020 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + clientgotesting "k8s.io/client-go/testing" + "knative.dev/eventing/pkg/reconciler/internal/service" + "knative.dev/pkg/apis" + fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" + "knative.dev/pkg/logging" + logtesting "knative.dev/pkg/logging/testing" + + . "knative.dev/eventing/pkg/reconciler/testing" + . "knative.dev/pkg/reconciler/testing" +) + +func TestReconcile(t *testing.T) { + cases := []struct { + name string + objects []runtime.Object + args service.Args + reactors []clientgotesting.ReactionFunc + wantCreates []runtime.Object + wantUpdates []runtime.Object + wantStatus *service.Status + wantErr bool + }{{ + name: "already reconciled do nothing", + args: service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + DeployMeta: metav1.ObjectMeta{ + Name: "my-svc-deploy", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + PodSpec: MakePodSpec( + "user-container", + "image.example.com", + WithPodContainerPort("", 8000), + ), + }, + objects: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image.example.com", WithPodContainerPort("http", 8000))), + ), + NewService("my-svc", "my-ns", + WithServiceOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithServiceLabels(map[string]string{"app": "test"}), + WithServicePorts([]corev1.ServicePort{{Name: "http", Port: 80, TargetPort: intstr.FromInt(8000)}}), + ), + }, + wantStatus: &service.Status{ + IsReady: true, + URL: &apis.URL{ + Scheme: "http", + Host: "my-svc.my-ns.svc.cluster.local", + }, + }, + }, { + name: "successful create", + args: service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + DeployMeta: metav1.ObjectMeta{ + Name: "my-svc-deploy", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + PodSpec: MakePodSpec( + "user-container", + "image.example.com", + WithPodContainerPort("", 8000), + ), + }, + wantCreates: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image.example.com", WithPodContainerPort("http", 8000))), + ), + NewService("my-svc", "my-ns", + WithServiceOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithServiceLabels(map[string]string{"app": "test"}), + WithServicePorts([]corev1.ServicePort{{Name: "http", Port: 80, TargetPort: intstr.FromInt(8000)}}), + ), + }, + wantStatus: &service.Status{ + IsReady: true, + URL: &apis.URL{ + Scheme: "http", + Host: "my-svc.my-ns.svc.cluster.local", + }, + }, + }, { + name: "successful update", + args: service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + DeployMeta: metav1.ObjectMeta{ + Name: "my-svc-deploy", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + PodSpec: MakePodSpec( + "user-container", + "image2.example.com", + WithPodContainerPort("", 8000), + ), + }, + objects: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image.example.com", WithPodContainerPort("http", 8000))), + ), + NewService("my-svc", "my-ns", + WithServiceOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithServiceLabels(map[string]string{"app": "test"}), + WithServicePorts([]corev1.ServicePort{{Name: "http", Port: 80, TargetPort: intstr.FromInt(8000)}}), + ), + }, + wantUpdates: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image2.example.com", WithPodContainerPort("http", 8000))), + ), + }, + wantStatus: &service.Status{ + IsReady: true, + URL: &apis.URL{ + Scheme: "http", + Host: "my-svc.my-ns.svc.cluster.local", + }, + }, + }, { + name: "deployment not available", + args: service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + DeployMeta: metav1.ObjectMeta{ + Name: "my-svc-deploy", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + PodSpec: MakePodSpec( + "user-container", + "image.example.com", + WithPodContainerPort("", 8000), + ), + }, + objects: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image.example.com", WithPodContainerPort("http", 8000))), + WithDeploymentNotAvailable(), + ), + NewService("my-svc", "my-ns", + WithServiceOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithServiceLabels(map[string]string{"app": "test"}), + WithServicePorts([]corev1.ServicePort{{Name: "http", Port: 80, TargetPort: intstr.FromInt(8000)}}), + ), + }, + wantStatus: &service.Status{ + IsReady: false, + Message: `The Deployment "my-svc-deploy" is unavailable`, + Reason: "DeploymentUnavailable", + URL: &apis.URL{ + Scheme: "http", + Host: "my-svc.my-ns.svc.cluster.local", + }, + }, + }, { + name: "create deployment error", + args: service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + DeployMeta: metav1.ObjectMeta{ + Name: "my-svc-deploy", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + PodSpec: MakePodSpec( + "user-container", + "image.example.com", + WithPodContainerPort("", 8000), + ), + }, + reactors: []clientgotesting.ReactionFunc{ + InduceFailure("create", "deployments"), + }, + wantCreates: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image.example.com", WithPodContainerPort("http", 8000))), + ), + }, + wantErr: true, + }, { + name: "update deployment error", + args: service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + DeployMeta: metav1.ObjectMeta{ + Name: "my-svc-deploy", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + PodSpec: MakePodSpec( + "user-container", + "image2.example.com", + WithPodContainerPort("", 8000), + ), + }, + reactors: []clientgotesting.ReactionFunc{ + InduceFailure("update", "deployments"), + }, + objects: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image.example.com", WithPodContainerPort("http", 8000))), + ), + }, + wantUpdates: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image2.example.com", WithPodContainerPort("http", 8000))), + ), + }, + wantErr: true, + }, { + name: "create service error", + args: service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + DeployMeta: metav1.ObjectMeta{ + Name: "my-svc-deploy", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + PodSpec: MakePodSpec( + "user-container", + "image.example.com", + WithPodContainerPort("", 8000), + ), + }, + reactors: []clientgotesting.ReactionFunc{ + InduceFailure("create", "services"), + }, + objects: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image.example.com", WithPodContainerPort("http", 8000))), + ), + }, + wantCreates: []runtime.Object{ + NewService("my-svc", "my-ns", + WithServiceOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithServiceLabels(map[string]string{"app": "test"}), + WithServicePorts([]corev1.ServicePort{{Name: "http", Port: 80, TargetPort: intstr.FromInt(8000)}}), + ), + }, + wantErr: true, + }, { + name: "update service error", + args: service.Args{ + ServiceMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + DeployMeta: metav1.ObjectMeta{ + Name: "my-svc-deploy", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + PodSpec: MakePodSpec( + "user-container", + "image.example.com", + WithPodContainerPort("", 8001), + ), + }, + reactors: []clientgotesting.ReactionFunc{ + InduceFailure("update", "services"), + }, + objects: []runtime.Object{ + NewDeployment("my-svc-deploy", "my-ns", + WithDeploymentOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithDeploymentLabels(map[string]string{"app": "test"}), + WithDeploymentPodSpec(MakePodSpec("user-container", "image.example.com", WithPodContainerPort("http", 8001))), + ), + NewService("my-svc", "my-ns", + WithServiceOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithServiceLabels(map[string]string{"app": "test"}), + WithServicePorts([]corev1.ServicePort{{Name: "http", Port: 80, TargetPort: intstr.FromInt(8000)}}), + ), + }, + wantUpdates: []runtime.Object{ + NewService("my-svc", "my-ns", + WithServiceOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithServiceLabels(map[string]string{"app": "test"}), + WithServicePorts([]corev1.ServicePort{{Name: "http", Port: 80, TargetPort: intstr.FromInt(8001)}}), + ), + }, + wantErr: true, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + ctx = logging.WithLogger(ctx, logtesting.TestLogger(t)) + + ls := NewListers(tc.objects) + ctx, kubeClient := fakekubeclient.With(ctx, ls.GetKubeObjects()...) + + for _, reactor := range tc.reactors { + kubeClient.PrependReactor("*", "*", reactor) + } + recorderList := ActionRecorderList{kubeClient} + + svcReconciler := &ServiceReconciler{ + KubeClientSet: kubeClient, + DeploymentLister: ls.GetDeploymentLister(), + ServiceLister: ls.GetServiceLister(), + } + + status, err := svcReconciler.Reconcile(ctx, MakeOwnerReference(), tc.args) + if (err != nil) != tc.wantErr { + t.Error("Service reconcile got err=nil want err") + } + + if diff := cmp.Diff(tc.wantStatus, status, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Unexpected create (-want, +got): %s", diff) + } + + actions, err := recorderList.ActionsByVerb() + if err != nil { + t.Errorf("Error capturing actions by verb: %q", err) + } + + for i, want := range tc.wantCreates { + if i >= len(actions.Creates) { + t.Errorf("Missing create: %#v", want) + continue + } + got := actions.Creates[i] + obj := got.GetObject() + + if diff := cmp.Diff(want, obj, ignoreLastTransitionTime, safeDeployDiff, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Unexpected create (-want, +got): %s", diff) + } + } + if got, want := len(actions.Creates), len(tc.wantCreates); got > want { + for _, extra := range actions.Creates[want:] { + t.Errorf("Extra create: %#v", extra.GetObject()) + } + } + + for i, want := range tc.wantUpdates { + if i >= len(actions.Updates) { + t.Errorf("Missing update: %#v", want) + continue + } + got := actions.Updates[i] + obj := got.GetObject() + + if diff := cmp.Diff(want, obj, ignoreLastTransitionTime, safeDeployDiff, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Unexpected update (-want, +got): %s", diff) + } + } + if got, want := len(actions.Updates), len(tc.wantUpdates); got > want { + for _, extra := range actions.Updates[want:] { + t.Errorf("Extra update: %#v", extra.GetObject()) + } + } + }) + } +} + +func TestGetStatus(t *testing.T) { + cases := []struct { + name string + objects []runtime.Object + reactors []clientgotesting.ReactionFunc + svcMeta metav1.ObjectMeta + wantStatus *service.Status + wantErr bool + }{{ + name: "successfully get status", + objects: []runtime.Object{ + NewService("my-svc", "my-ns", + WithServiceOwnerReferences([]metav1.OwnerReference{MakeOwnerReference()}), + WithServiceLabels(map[string]string{"app": "test"}), + WithServicePorts([]corev1.ServicePort{{Name: "http", Port: 80, TargetPort: intstr.FromInt(8000)}}), + ), + }, + svcMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + wantStatus: &service.Status{ + IsReady: true, + URL: &apis.URL{ + Scheme: "http", + Host: "my-svc.my-ns.svc.cluster.local", + }, + }, + }, { + name: "not found", + svcMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + wantErr: true, + }, { + name: "get service error", + svcMeta: metav1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-ns", + Labels: map[string]string{"app": "test"}, + }, + reactors: []clientgotesting.ReactionFunc{ + InduceFailure("get", "services"), + }, + wantErr: true, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + ctx = logging.WithLogger(ctx, logtesting.TestLogger(t)) + + ls := NewListers(tc.objects) + ctx, kubeClient := fakekubeclient.With(ctx, ls.GetKubeObjects()...) + + for _, reactor := range tc.reactors { + kubeClient.PrependReactor("*", "*", reactor) + } + + svcReconciler := &ServiceReconciler{ + KubeClientSet: kubeClient, + DeploymentLister: ls.GetDeploymentLister(), + ServiceLister: ls.GetServiceLister(), + } + + status, err := svcReconciler.GetStatus(ctx, tc.svcMeta) + if (err != nil) != tc.wantErr { + t.Error("Service reconcile got err=nil want err") + } + if diff := cmp.Diff(tc.wantStatus, status, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Unexpected create (-want, +got): %s", diff) + } + }) + } +} + +func MakeOwnerReference() metav1.OwnerReference { + trueVal := true + return metav1.OwnerReference{ + APIVersion: "test.example.com/v1", + Kind: "owner", + Name: "owner", + BlockOwnerDeletion: &trueVal, + Controller: &trueVal, + } +} + +type PodSpecOption func(*corev1.PodSpec) + +func MakePodSpec(name, image string, opts ...PodSpecOption) corev1.PodSpec { + p := &corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: name, + Image: image, + }, + }, + } + + for _, opt := range opts { + opt(p) + } + return *p +} + +func WithPodContainerPort(name string, port int32) PodSpecOption { + return func(p *corev1.PodSpec) { + p.Containers[0].Ports = []corev1.ContainerPort{ + { + Name: name, + ContainerPort: port, + }, + } + } +} + +func WithPodContainerProbes(liveness, readiness *corev1.Probe) PodSpecOption { + return func(p *corev1.PodSpec) { + p.Containers[0].LivenessProbe = liveness + p.Containers[0].ReadinessProbe = readiness + } +} + +var ( + ignoreLastTransitionTime = cmp.FilterPath(func(p cmp.Path) bool { + return strings.HasSuffix(p.String(), "LastTransitionTime.Inner.Time") + }, cmp.Ignore()) + + safeDeployDiff = cmpopts.IgnoreUnexported(resource.Quantity{}) +) diff --git a/pkg/reconciler/testing/deployment.go b/pkg/reconciler/testing/deployment.go index 8a7e6b46307..9d8efd09634 100644 --- a/pkg/reconciler/testing/deployment.go +++ b/pkg/reconciler/testing/deployment.go @@ -96,3 +96,20 @@ func WithDeploymentAvailable() DeploymentOption { } } } + +func WithDeploymentPodSpec(podSpec corev1.PodSpec) DeploymentOption { + return func(d *appsv1.Deployment) { + d.Spec.Template.Spec = podSpec + } +} + +func WithDeploymentNotAvailable() DeploymentOption { + return func(d *appsv1.Deployment) { + d.Status.Conditions = []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionFalse, + }, + } + } +}