diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index e36a5ad181f..aba5f0dfb30 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -23,6 +23,7 @@ import ( "reflect" "time" + status "github.com/knative/eventing/pkg/apis/duck" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/apis/sources/v1alpha1" eventinglisters "github.com/knative/eventing/pkg/client/listers/eventing/v1alpha1" @@ -40,7 +41,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" appsv1listers "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/cache" "knative.dev/pkg/controller" @@ -48,9 +48,11 @@ import ( const ( // Name of the corev1.Events emitted from the reconciliation process - cronjobReconciled = "CronJobSourceReconciled" - cronJobReadinessChanged = "CronJobSourceReadinessChanged" - cronjobUpdateStatusFailed = "CronJobSourceUpdateStatusFailed" + cronJobReconciled = "CronJobSourceReconciled" + cronJobReadinessChanged = "CronJobSourceReadinessChanged" + cronJobUpdateStatusFailed = "CronJobSourceUpdateStatusFailed" + cronJobSourceDeploymentCreated = "CronJobSurceDeploymentCreated" + cronJobSourceDeploymentUpdated = "CronJobSourceDeploymentUpdated" // raImageEnvVar is the name of the environment variable that contains the receive adapter's // image. It must be defined. @@ -104,12 +106,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { logging.FromContext(ctx).Warn("Error reconciling CronJobSource", zap.Error(err)) } else { logging.FromContext(ctx).Debug("CronJobSource reconciled") - r.Recorder.Eventf(cronjob, corev1.EventTypeNormal, cronjobReconciled, `CronJobSource reconciled: "%s/%s"`, cronjob.Namespace, cronjob.Name) + r.Recorder.Eventf(cronjob, corev1.EventTypeNormal, cronJobReconciled, `CronJobSource reconciled: "%s/%s"`, cronjob.Namespace, cronjob.Name) } if _, updateStatusErr := r.updateStatus(ctx, cronjob.DeepCopy()); updateStatusErr != nil { logging.FromContext(ctx).Warn("Failed to update the CronJobSource", zap.Error(err)) - r.Recorder.Eventf(cronjob, corev1.EventTypeWarning, cronjobUpdateStatusFailed, "Failed to update CronJobSource's status: %v", err) + r.Recorder.Eventf(cronjob, corev1.EventTypeWarning, cronJobUpdateStatusFailed, "Failed to update CronJobSource's status: %v", err) return updateStatusErr } @@ -155,11 +157,18 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou ra, err := r.createReceiveAdapter(ctx, cronjob, sinkURI) if err != nil { - r.Logger.Error("Unable to create the receive adapter", zap.Error(err)) + logging.FromContext(ctx).Error("Unable to create the receive adapter", zap.Error(err)) return fmt.Errorf("creating receive adapter: %v", err) } cronjob.Status.PropagateDeploymentAvailability(ra) + // TODO Delete this after 0.8 is cut. + if status.DeploymentIsAvailable(&ra.Status, true) { + err = r.deleteOldReceiveAdapter(ctx, cronjob) + if err != nil { + return fmt.Errorf("deleting old receive adapter: %v", err) + } + } _, err = r.reconcileEventType(ctx, cronjob) if err != nil { cronjob.Status.MarkNoEventType("EventTypeReconcileFailed", "") @@ -167,6 +176,16 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou } cronjob.Status.MarkEventType() + // TODO Delete this after 0.8 is cut. + oldEventType, err := r.getOldEventType(ctx, cronjob) + if err != nil { + return fmt.Errorf("getting old event type: %v", err) + } else if oldEventType != nil { + if err = r.EventingClientSet.EventingV1alpha1().EventTypes(cronjob.Namespace).Delete(oldEventType.Name, &metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("deleting old event type: %v", err) + } + } + return nil } @@ -201,16 +220,10 @@ func checkResourcesStatus(src *v1alpha1.CronJobSource) error { } func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource, sinkURI string) (*appsv1.Deployment, error) { - if err := checkResourcesStatus(src); err != nil { return nil, err } - ra, err := r.getReceiveAdapter(ctx, src) - if err != nil && !apierrors.IsNotFound(err) { - logging.FromContext(ctx).Error("Unable to get an existing receive adapter", zap.Error(err)) - return nil, fmt.Errorf("getting receive adapter: %v", err) - } adapterArgs := resources.ReceiveAdapterArgs{ Image: r.env.Image, Source: src, @@ -218,23 +231,30 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro SinkURI: sinkURI, } expected := resources.MakeReceiveAdapter(&adapterArgs) - if ra != nil { - if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) { - ra.Spec.Template.Spec = expected.Spec.Template.Spec - if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil { - return ra, fmt.Errorf("updating receive adapter: %v", err) - } - logging.FromContext(ctx).Info("Receive Adapter updated.", zap.Any("receiveAdapter", ra)) + + ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected) + msg := "Deployment created" + if err != nil { + msg = fmt.Sprintf("Deployment created, error: %v", err) } - logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra)) + r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentCreated, "%s", msg) + return ra, err + } else if err != nil { + return nil, fmt.Errorf("error getting receive adapter: %v", err) + } else if !metav1.IsControlledBy(ra, src) { + return nil, fmt.Errorf("deployment %q is not owned by CronJobSource %q", ra.Name, src.Name) + } else if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) { + ra.Spec.Template.Spec = expected.Spec.Template.Spec + if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil { + return ra, err + } + r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentUpdated, "Deployment %q updated", ra.Name) return ra, nil + } else { + logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra)) } - - ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected) - if err != nil { - return nil, fmt.Errorf("creating receive adapter: %v", err) - } - logging.FromContext(ctx).Info("Receive Adapter created.", zap.Any("receiveAdapter", ra)) return ra, nil } @@ -253,24 +273,27 @@ func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { return false } -func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource) (*appsv1.Deployment, error) { +func (r *Reconciler) deleteOldReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource) error { dl, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).List(metav1.ListOptions{ - LabelSelector: r.getLabelSelector(src).String(), + LabelSelector: labels.SelectorFromSet(resources.OldLabels(src.Name)).String(), }) if err != nil { - logging.FromContext(ctx).Error("Unable to list CronJobs: %v", zap.Error(err)) - return nil, fmt.Errorf("listing CronJobs: %v", err) + return fmt.Errorf("listing old receive adapter: %v", err) } - for _, dep := range dl.Items { - if metav1.IsControlledBy(&dep, src) { - return &dep, nil + for _, ora := range dl.Items { + if metav1.IsControlledBy(&ora, src) { + err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Delete(ora.Name, &metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("deleting old receive adapter %q: %v", ora.Name, err) + } } } - return nil, apierrors.NewNotFound(schema.GroupResource{}, "") + return nil } func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) { - current, err := r.getEventType(ctx, src) + expected := resources.MakeEventType(src) + current, err := r.eventTypeLister.EventTypes(src.Namespace).Get(expected.Name) if err != nil && !apierrors.IsNotFound(err) { logging.FromContext(ctx).Error("Unable to get an existing event type", zap.Error(err)) return nil, fmt.Errorf("getting event types: %v", err) @@ -281,7 +304,7 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ if src.Spec.Sink.Kind != "Broker" { if current != nil { if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { - logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current)) + logging.FromContext(ctx).Error("Error deleting existing event type", zap.Error(err), zap.Any("eventType", current)) return nil, fmt.Errorf("deleting event type: %v", err) } } @@ -289,29 +312,29 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ return nil, nil } - expected := resources.MakeEventType(src) if current != nil { if equality.Semantic.DeepEqual(expected.Spec, current.Spec) { return current, nil } // EventTypes are immutable, delete it and create it again. if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { - logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current)) + logging.FromContext(ctx).Error("Error deleting existing event type", zap.Error(err), zap.Any("eventType", current)) return nil, fmt.Errorf("deleting event type: %v", err) } } + current, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(expected) if err != nil { - logging.FromContext(ctx).Error("Error creating event type", zap.Any("eventType", current)) + logging.FromContext(ctx).Error("Error creating event type", zap.Error(err), zap.Any("eventType", expected)) return nil, fmt.Errorf("creating event type: %v", err) } logging.FromContext(ctx).Debug("EventType created", zap.Any("eventType", current)) return current, nil } -func (r *Reconciler) getEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) { +func (r *Reconciler) getOldEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) { etl, err := r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).List(metav1.ListOptions{ - LabelSelector: r.getLabelSelector(src).String(), + LabelSelector: r.getOldLabelSelector(src).String(), }) if err != nil { logging.FromContext(ctx).Error("Unable to list event types: %v", zap.Error(err)) @@ -322,11 +345,11 @@ func (r *Reconciler) getEventType(ctx context.Context, src *v1alpha1.CronJobSour return &et, nil } } - return nil, apierrors.NewNotFound(schema.GroupResource{}, "") + return nil, nil } -func (r *Reconciler) getLabelSelector(src *v1alpha1.CronJobSource) labels.Selector { - return labels.SelectorFromSet(resources.Labels(src.Name)) +func (r *Reconciler) getOldLabelSelector(src *v1alpha1.CronJobSource) labels.Selector { + return labels.SelectorFromSet(resources.OldLabels(src.Name)) } func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJobSource) (*v1alpha1.CronJobSource, error) { @@ -349,7 +372,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob cj, err := r.EventingClientSet.SourcesV1alpha1().CronJobSources(desired.Namespace).UpdateStatus(existing) if err == nil && becomesReady { duration := time.Since(cj.ObjectMeta.CreationTimestamp.Time) - r.Logger.Infof("CronJobSource %q became ready after %v", cronjob.Name, duration) + logging.FromContext(ctx).Info("CronJobSource became ready after", zap.Duration("duration", duration)) r.Recorder.Event(cronjob, corev1.EventTypeNormal, cronJobReadinessChanged, fmt.Sprintf("CronJobSource %q became ready", cronjob.Name)) if recorderErr := r.StatsReporter.ReportReady("CronJobSource", cronjob.Namespace, cronjob.Name, duration); recorderErr != nil { logging.FromContext(ctx).Error("Failed to record ready for CronJobSource", zap.Error(recorderErr)) diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index 7cfaad0d3b2..d3530142375 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -62,14 +62,17 @@ var ( APIVersion: "sources.eventing.knative.dev/v1alpha1", Kind: "CronJobSource", Name: sourceName, + UID: sourceUID, Controller: &trueVal, BlockOwnerDeletion: &trueVal, } + eventTypeName = fmt.Sprintf("dev.knative.cronjob.event-%s", sourceUID) ) const ( image = "github.com/knative/test/image" sourceName = "test-cronjob-source" + sourceUID = "1234" testNS = "testnamespace" testSchedule = "*/2 * * * *" testData = "data" @@ -97,7 +100,7 @@ func TestAllCases(t *testing.T) { }, { Name: "invalid schedule", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: "invalid schedule", Data: testData, @@ -111,7 +114,7 @@ func TestAllCases(t *testing.T) { // Eventf(corev1.EventTypeWarning, "Fail", ""), // TODO: BUGBUGBUG This should make an event. //}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: "invalid schedule", Data: testData, @@ -125,7 +128,7 @@ func TestAllCases(t *testing.T) { }, { Name: "missing sink", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -136,7 +139,7 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -151,7 +154,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -170,7 +173,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReadinessChanged", `CronJobSource %q became ready`, sourceName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -188,7 +191,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid with event type creation", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -207,7 +210,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReadinessChanged", `CronJobSource %q became ready`, sourceName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -223,8 +226,7 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewEventType("", testNS, - WithEventTypeGenerateName(fmt.Sprintf("%s-", utils.ToDNS1123Subdomain(sourcesv1alpha1.CronJobEventType))), + NewEventType(eventTypeName, testNS, WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), @@ -234,7 +236,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid with event type deletion and creation", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -245,7 +247,7 @@ func TestAllCases(t *testing.T) { WithInitBrokerConditions, WithBrokerAddress(sinkDNS), ), - NewEventType("name-1", testNS, + NewEventType(eventTypeName, testNS, WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType("type-1"), WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), @@ -259,7 +261,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReadinessChanged", `CronJobSource %q became ready`, sourceName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -275,11 +277,10 @@ func TestAllCases(t *testing.T) { ), }}, WantDeletes: []clientgotesting.DeleteActionImpl{{ - Name: "name-1", + Name: eventTypeName, }}, WantCreates: []runtime.Object{ - NewEventType("", testNS, - WithEventTypeGenerateName(fmt.Sprintf("%s-", utils.ToDNS1123Subdomain(sourcesv1alpha1.CronJobEventType))), + NewEventType(eventTypeName, testNS, WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), @@ -289,7 +290,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid, existing ra", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -308,7 +309,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReadinessChanged", `CronJobSource %q became ready`, sourceName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -326,7 +327,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid, no change", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -350,9 +351,9 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReconciled", `CronJobSource reconciled: "%s/%s"`, testNS, sourceName), }, }, { - Name: "valid with event type deletion", + Name: "valid with old event type deletion", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -372,7 +373,7 @@ func TestAllCases(t *testing.T) { ), makeAvailableReceiveAdapter(sinkRef), NewEventType("name-1", testNS, - WithEventTypeLabels(resources.Labels(sourceName)), + WithEventTypeLabels(resources.OldLabels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), WithEventTypeBroker(sinkName), @@ -385,6 +386,42 @@ func TestAllCases(t *testing.T) { WantDeletes: []clientgotesting.DeleteActionImpl{{ Name: "name-1", }}, + }, { + Name: "valid with event type deletion", + Objects: []runtime.Object{ + NewCronJobSource(sourceName, testNS, sourceUID, + WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ + Schedule: testSchedule, + Data: testData, + Sink: &sinkRef, + }), + WithInitCronJobSourceConditions, + WithValidCronJobSourceSchedule, + WithValidCronJobSourceResources, + WithValidCronJobSourceResources, + WithCronJobSourceDeployed, + WithCronJobSourceSink(sinkURI), + WithCronJobSourceEventType, + ), + NewChannel(sinkName, testNS, + WithInitChannelConditions, + WithChannelAddress(sinkDNS), + ), + makeAvailableReceiveAdapter(sinkRef), + NewEventType(eventTypeName, testNS, + WithEventTypeLabels(resources.Labels(sourceName)), + WithEventTypeType(sourcesv1alpha1.CronJobEventType), + WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), + WithEventTypeBroker(sinkName), + WithEventTypeOwnerReference(ownerRef)), + }, + Key: testNS + "/" + sourceName, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "CronJobSourceReconciled", `CronJobSource reconciled: "%s/%s"`, testNS, sourceName), + }, + WantDeletes: []clientgotesting.DeleteActionImpl{{ + Name: eventTypeName, + }}, }, } @@ -406,10 +443,6 @@ func TestAllCases(t *testing.T) { )) } -func makeReceiveAdapter() *appsv1.Deployment { - return makeReceiveAdapterWithSink(sinkRef) -} - func makeAvailableReceiveAdapter(ref corev1.ObjectReference) *appsv1.Deployment { ra := makeReceiveAdapterWithSink(ref) WithDeploymentAvailable()(ra) @@ -417,7 +450,7 @@ func makeAvailableReceiveAdapter(ref corev1.ObjectReference) *appsv1.Deployment } func makeReceiveAdapterWithSink(ref corev1.ObjectReference) *appsv1.Deployment { - source := NewCronSourceJob(sourceName, testNS, + source := NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, diff --git a/pkg/reconciler/cronjobsource/resources/eventtype.go b/pkg/reconciler/cronjobsource/resources/eventtype.go index 45d1f6e5660..dc6a86fc232 100644 --- a/pkg/reconciler/cronjobsource/resources/eventtype.go +++ b/pkg/reconciler/cronjobsource/resources/eventtype.go @@ -17,8 +17,6 @@ limitations under the License. package resources import ( - "fmt" - "knative.dev/pkg/kmeta" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -31,9 +29,9 @@ import ( func MakeEventType(src *v1alpha1.CronJobSource) *eventingv1alpha1.EventType { return &eventingv1alpha1.EventType{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: fmt.Sprintf("%s-", utils.ToDNS1123Subdomain(v1alpha1.CronJobEventType)), - Labels: Labels(src.Name), - Namespace: src.Namespace, + Name: utils.GenerateFixedName(src, utils.ToDNS1123Subdomain(v1alpha1.CronJobEventType)), + Labels: Labels(src.Name), + Namespace: src.Namespace, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(src), }, diff --git a/pkg/reconciler/cronjobsource/resources/labels.go b/pkg/reconciler/cronjobsource/resources/labels.go index 04dc6098fda..cd36a5933bb 100644 --- a/pkg/reconciler/cronjobsource/resources/labels.go +++ b/pkg/reconciler/cronjobsource/resources/labels.go @@ -22,9 +22,18 @@ const ( controllerAgentName = "cronjob-source-controller" ) -func Labels(name string) map[string]string { +// OldLabels are the pre-0.8 labels. +// TODO Delete after 0.8 is cut. +func OldLabels(name string) map[string]string { return map[string]string{ "knative-eventing-source": controllerAgentName, "knative-eventing-source-name": name, } } + +// Labels are the labels attached to all resources based on a CronJobSource. +func Labels(name string) map[string]string { + return map[string]string{ + "sources.eventing.knative.dev/cronJobSource": name, + } +} diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter.go b/pkg/reconciler/cronjobsource/resources/receive_adapter.go index f0a08e3e726..dd5533b56c1 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter.go @@ -19,14 +19,13 @@ package resources import ( "fmt" - "knative.dev/pkg/kmeta" - + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + "github.com/knative/eventing/pkg/utils" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + "knative.dev/pkg/kmeta" ) var ( @@ -76,9 +75,9 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { return &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Namespace: args.Source.Namespace, - GenerateName: fmt.Sprintf("cronjob-%s-", args.Source.Name), - Labels: args.Labels, + Namespace: args.Source.Namespace, + Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("cronjobsource-%s", args.Source.Name)), + Labels: args.Labels, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(args.Source), }, diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go b/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go index be84a26ae70..7ad9c9ca7c6 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" v1 "k8s.io/api/apps/v1" @@ -33,6 +34,7 @@ func TestMakeReceiveAdapter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "source-name", Namespace: "source-namespace", + UID: "source-uid", }, Spec: v1alpha1.CronJobSourceSpec{ ServiceAccountName: "source-svc-acct", @@ -55,8 +57,8 @@ func TestMakeReceiveAdapter(t *testing.T) { yes := true want := &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "source-namespace", - GenerateName: "cronjob-source-name-", + Namespace: "source-namespace", + Name: fmt.Sprintf("cronjobsource-%s-%s", src.Name, src.UID), Labels: map[string]string{ "test-key1": "test-value1", "test-key2": "test-value2", @@ -64,7 +66,8 @@ func TestMakeReceiveAdapter(t *testing.T) { OwnerReferences: []metav1.OwnerReference{{ APIVersion: "sources.eventing.knative.dev/v1alpha1", Kind: "CronJobSource", - Name: "source-name", + Name: src.Name, + UID: src.UID, Controller: &yes, BlockOwnerDeletion: &yes, }}, @@ -130,59 +133,8 @@ func TestMakeReceiveAdapter(t *testing.T) { } if diff, err := kmp.SafeDiff(want, got); err != nil { - t.Errorf("unexpected cron job (-want, +got) = %v", diff) + t.Errorf("unexpected cron job resources (-want, +got) = %v", err) + } else if diff != "" { + t.Errorf("Unexpected deployment (-want +got) = %v", diff) } - src.Spec.Resources = v1alpha1.CronJobResourceSpec{ - Requests: v1alpha1.CronJobRequestsSpec{ - ResourceCPU: "101m", - ResourceMemory: "200Mi", - }, - Limits: v1alpha1.CronJobLimitsSpec{ - ResourceCPU: "102m", - ResourceMemory: "500Mi", - }, - } - want.Spec.Template.Spec.Containers = []corev1.Container{ - { - Name: "receive-adapter", - Image: "test-image", - Env: []corev1.EnvVar{ - { - Name: "SCHEDULE", - Value: "*/2 * * * *", - }, - { - Name: "DATA", - Value: "data", - }, - { - Name: "SINK_URI", - Value: "sink-uri", - }, - { - Name: "NAME", - Value: "source-name", - }, - { - Name: "NAMESPACE", - Value: "source-namespace", - }, - }, - Resources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("101m"), - corev1.ResourceMemory: resource.MustParse("200Mi"), - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("102m"), - corev1.ResourceMemory: resource.MustParse("500Mi"), - }, - }, - }, - } - - if diff, err := kmp.SafeDiff(want, got); err != nil { - t.Errorf("unexpected cron job resources (-want, +got) = %v", diff) - } - } diff --git a/pkg/reconciler/testing/cronjobsource.go b/pkg/reconciler/testing/cronjobsource.go index 5f976a540e5..a4ed9bd13c7 100644 --- a/pkg/reconciler/testing/cronjobsource.go +++ b/pkg/reconciler/testing/cronjobsource.go @@ -19,30 +19,31 @@ package testing import ( "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) // CronJobSourceOption enables further configuration of a CronJob. type CronJobSourceOption func(*v1alpha1.CronJobSource) -// NewCronJob creates a CronJob with CronJobOptions -func NewCronSourceJob(name, namespace string, o ...CronJobSourceOption) *v1alpha1.CronJobSource { +// NewCronJobSource creates a CronJobSource with CronJobOptions. +func NewCronJobSource(name, namespace, uid string, o ...CronJobSourceOption) *v1alpha1.CronJobSource { c := &v1alpha1.CronJobSource{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + UID: types.UID(uid), }, } for _, opt := range o { opt(c) } - //c.SetDefaults(context.Background()) // TODO: We should add defaults and validation. + // c.SetDefaults(context.Background()) // TODO: We should add defaults and validation. return c } -// WithInitCronJobConditions initializes the CronJobSource's conditions. +// WithInitCronJobSourceConditions initializes the CronJobSource's conditions. func WithInitCronJobSourceConditions(s *v1alpha1.CronJobSource) { s.Status.InitializeConditions() } diff --git a/pkg/reconciler/testing/eventtype.go b/pkg/reconciler/testing/eventtype.go index 0d31841a7af..521a625ffc0 100644 --- a/pkg/reconciler/testing/eventtype.go +++ b/pkg/reconciler/testing/eventtype.go @@ -47,12 +47,6 @@ func WithInitEventTypeConditions(et *v1alpha1.EventType) { et.Status.InitializeConditions() } -func WithEventTypeGenerateName(generateName string) EventTypeOption { - return func(et *v1alpha1.EventType) { - et.ObjectMeta.GenerateName = generateName - } -} - func WithEventTypeSource(source string) EventTypeOption { return func(et *v1alpha1.EventType) { et.Spec.Source = source