From ddd93dbb5b6d9924927cd009b1f901e196a16be7 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 2 Aug 2019 14:53:38 -0700 Subject: [PATCH 01/14] Random CronJobSource improvements. --- pkg/reconciler/cronjobsource/controller.go | 9 +- pkg/reconciler/cronjobsource/cronjobsource.go | 95 +++++++--------- .../cronjobsource/cronjobsource_test.go | 12 +- pkg/reconciler/cronjobsource/env_lookup.go | 64 +++++++++++ .../cronjobsource/env_lookup_test.go | 107 ++++++++++++++++++ .../resources/receive_adapter.go | 9 +- 6 files changed, 226 insertions(+), 70 deletions(-) create mode 100644 pkg/reconciler/cronjobsource/env_lookup.go create mode 100644 pkg/reconciler/cronjobsource/env_lookup_test.go diff --git a/pkg/reconciler/cronjobsource/controller.go b/pkg/reconciler/cronjobsource/controller.go index 7f325d4179d..b333eee2ce1 100644 --- a/pkg/reconciler/cronjobsource/controller.go +++ b/pkg/reconciler/cronjobsource/controller.go @@ -51,10 +51,11 @@ func NewController( eventTypeInformer := eventtypeinformer.Get(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - cronjobLister: cronJobSourceInformer.Lister(), - deploymentLister: deploymentInformer.Lister(), - eventTypeLister: eventTypeInformer.Lister(), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + cronjobLister: cronJobSourceInformer.Lister(), + deploymentLister: deploymentInformer.Lister(), + eventTypeLister: eventTypeInformer.Lister(), + receiveAdapterImage: newEnvLookup(raImageEnvVar), } impl := controller.NewImpl(r, r.Logger, ReconcilerName) r.sinkReconciler = duck.NewInjectionSinkReconciler(ctx, impl.EnqueueKey) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 31468f84b5b..b7dd51931fa 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -18,10 +18,9 @@ package cronjobsource import ( "context" + "errors" "fmt" - "os" "reflect" - "sync" "time" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -61,8 +60,7 @@ const ( type Reconciler struct { *reconciler.Base - receiveAdapterImage string - once sync.Once + receiveAdapterImage EnvLookup // listers index properties about resources cronjobLister listers.CronJobSourceLister @@ -133,13 +131,13 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou _, err := cron.ParseStandard(cronjob.Spec.Schedule) if err != nil { cronjob.Status.MarkInvalidSchedule("Invalid", "") - return err + return fmt.Errorf("invalid schedule: %v", err) } cronjob.Status.MarkSchedule() if cronjob.Spec.Sink == nil { cronjob.Status.MarkNoSink("Missing", "Sink missing from spec") - return fmt.Errorf("Sink missing from spec") + return errors.New("spec.sink missing") } sinkObjRef := cronjob.Spec.Sink @@ -147,44 +145,31 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou sinkObjRef.Namespace = cronjob.Namespace } - cronjobDesc := cronjob.Namespace + "/" + cronjob.Name + ", " + cronjob.GroupVersionKind().String() + cronjobDesc := fmt.Sprintf("%s/%s,%s", cronjob.Namespace, cronjob.Name, cronjob.GroupVersionKind().String()) sinkURI, err := r.sinkReconciler.GetSinkURI(sinkObjRef, cronjob, cronjobDesc) if err != nil { cronjob.Status.MarkNoSink("NotFound", "") - return err + return fmt.Errorf("getting sink URI: %v", err) } cronjob.Status.MarkSink(sinkURI) _, err = r.createReceiveAdapter(ctx, cronjob, sinkURI) if err != nil { r.Logger.Error("Unable to create the receive adapter", zap.Error(err)) - return err + return fmt.Errorf("creating receive adapter: %v", err) } cronjob.Status.MarkDeployed() _, err = r.reconcileEventType(ctx, cronjob) if err != nil { cronjob.Status.MarkNoEventType("EventTypeReconcileFailed", "") - return err + return fmt.Errorf("reconciling event types: %v", err) } cronjob.Status.MarkEventType() return nil } -func (r *Reconciler) getReceiveAdapterImage() string { - if r.receiveAdapterImage == "" { - r.once.Do(func() { - raImage, defined := os.LookupEnv(raImageEnvVar) - if !defined { - panic(fmt.Errorf("required environment variable %q not defined", raImageEnvVar)) - } - r.receiveAdapterImage = raImage - }) - } - return r.receiveAdapterImage -} - func checkResourcesStatus(src *v1alpha1.CronJobSource) error { for _, rsrc := range []struct { @@ -206,8 +191,8 @@ func checkResourcesStatus(src *v1alpha1.CronJobSource) error { // In the event the field isn't specified, we assign a default in the receive_adapter if rsrc.field != "" { if _, err := resource.ParseQuantity(rsrc.field); err != nil { - src.Status.MarkResourcesIncorrect("Incorrect Resource", "%s: %s, Error: %s", rsrc.key, rsrc.field, err) - return err + src.Status.MarkResourcesIncorrect("Incorrect Resource", "%s: %q, Error: %s", rsrc.key, rsrc.field, err) + return fmt.Errorf("incorrect resource specification, %s: %q: %v", rsrc.key, rsrc.field, err) } } } @@ -224,10 +209,10 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro ra, err := r.getReceiveAdapter(ctx, src) if err != nil && !apierrors.IsNotFound(err) { logging.FromContext(ctx).Error("Unable to get an existing receive adapter", zap.Error(err)) - return nil, err + return nil, fmt.Errorf("getting receive adapter: %v", err) } adapterArgs := resources.ReceiveAdapterArgs{ - Image: r.getReceiveAdapterImage(), + Image: r.receiveAdapterImage.GetValue(), Source: src, Labels: resources.Labels(src.Name), SinkURI: sinkURI, @@ -237,23 +222,23 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro if r.podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) { ra.Spec.Template.Spec = expected.Spec.Template.Spec if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil { - return ra, err + return ra, fmt.Errorf("updating receive adapter: %v", err) } logging.FromContext(ctx).Info("Receive Adapter updated.", zap.Any("receiveAdapter", ra)) - } else { - logging.FromContext(ctx).Info("Reusing existing receive adapter", zap.Any("receiveAdapter", ra)) } + logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra)) return ra, nil } - if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected); err != nil { - return nil, err + ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected) + if err != nil { + return nil, fmt.Errorf("creating receive adapter: %v", err) } - logging.FromContext(ctx).Info("Receive Adapter created.", zap.Any("receiveAdapter", expected)) - return ra, err + logging.FromContext(ctx).Info("Receive Adapter created.", zap.Any("receiveAdapter", ra)) + return ra, nil } -func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { +func (*Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { if !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) { return true } @@ -273,8 +258,8 @@ func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.CronJo LabelSelector: r.getLabelSelector(src).String(), }) if err != nil { - logging.FromContext(ctx).Error("Unable to list cronjobs: %v", zap.Error(err)) - return nil, err + logging.FromContext(ctx).Error("Unable to list CronJobs: %v", zap.Error(err)) + return nil, fmt.Errorf("listing CronJobs: %v", err) } for _, dep := range dl.Items { if metav1.IsControlledBy(&dep, src) { @@ -288,7 +273,7 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ current, err := r.getEventType(ctx, src) if err != nil && !apierrors.IsNotFound(err) { logging.FromContext(ctx).Error("Unable to get an existing event type", zap.Error(err)) - return nil, err + return nil, fmt.Errorf("getting event types: %v", err) } // Only create EventTypes for Broker sinks. But if there is an EventType and the src has a non-Broker sink @@ -297,7 +282,7 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ if current != nil { if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current)) - return nil, err + return nil, fmt.Errorf("deleting event type: %v", err) } } // No current and no error. @@ -306,24 +291,22 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ expected := resources.MakeEventType(src) if current != nil { - if !equality.Semantic.DeepEqual(expected.Spec, current.Spec) { - // As is immutable, delete it and create it again. - if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { - logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current)) - return nil, err - } - if current, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(expected); err != nil { - logging.FromContext(ctx).Error("Error creating event type", zap.Any("eventType", current)) - return nil, err - } + if equality.Semantic.DeepEqual(expected.Spec, current.Spec) { + return current, nil + } + // EventTypes are immutable, delete it and create it again. + if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { + logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current)) + return nil, fmt.Errorf("deleting event type: %v", err) } - return current, nil } - if current, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(expected); err != nil { - return nil, err + current, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(expected) + if err != nil { + logging.FromContext(ctx).Error("Error creating event type", zap.Any("eventType", current)) + return nil, fmt.Errorf("creating event type: %v", err) } - logging.FromContext(ctx).Info("EventType created", zap.Any("eventType", current)) - return current, err + logging.FromContext(ctx).Debug("EventType created", zap.Any("eventType", current)) + return current, nil } func (r *Reconciler) getEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) { @@ -368,8 +351,8 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob duration := time.Since(cj.ObjectMeta.CreationTimestamp.Time) r.Logger.Infof("CronJobSource %q became ready after %v", cronjob.Name, duration) r.Recorder.Event(cronjob, corev1.EventTypeNormal, cronJobReadinessChanged, fmt.Sprintf("CronJobSource %q became ready", cronjob.Name)) - if err := r.StatsReporter.ReportReady("CronJobSource", cronjob.Namespace, cronjob.Name, duration); err != nil { - logging.FromContext(ctx).Sugar().Infof("failed to record ready for CronJobSource, %v", err) + if recorderErr := r.StatsReporter.ReportReady("CronJobSource", cronjob.Namespace, cronjob.Name, duration); recorderErr != nil { + logging.FromContext(ctx).Info("Failed to record ready for CronJobSource", zap.Error(recorderErr)) } } diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index 487f787f133..c40884c8a43 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -19,7 +19,6 @@ package cronjobsource import ( "context" "fmt" - "os" "testing" "knative.dev/pkg/configmap" @@ -83,8 +82,6 @@ func init() { _ = appsv1.AddToScheme(scheme.Scheme) _ = corev1.AddToScheme(scheme.Scheme) _ = duckv1alpha1.AddToScheme(scheme.Scheme) - - _ = os.Setenv("CRONJOB_RA_IMAGE", image) } func TestAllCases(t *testing.T) { @@ -396,10 +393,11 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - cronjobLister: listers.GetCronJobSourceLister(), - deploymentLister: listers.GetDeploymentLister(), - eventTypeLister: listers.GetEventTypeLister(), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + cronjobLister: listers.GetCronJobSourceLister(), + deploymentLister: listers.GetDeploymentLister(), + eventTypeLister: listers.GetEventTypeLister(), + receiveAdapterImage: FinishedEnvLookup(image), } r.sinkReconciler = duck.NewInjectionSinkReconciler(ctx, func(string) {}) return r diff --git a/pkg/reconciler/cronjobsource/env_lookup.go b/pkg/reconciler/cronjobsource/env_lookup.go new file mode 100644 index 00000000000..e7e8d8e15e5 --- /dev/null +++ b/pkg/reconciler/cronjobsource/env_lookup.go @@ -0,0 +1,64 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjobsource + +import ( + "fmt" + "os" + "sync" +) + +// EnvLookup returns the value of the associated environment variable. It only does the lookup once. +// If the environment variable is undefined, then every call to GetValue() will panic. +type EnvLookup interface { + // GetValue returns the value of the associated environment variable or panic if the environment + // variable is not defined. + GetValue() string +} + +// envLookup looks up an environment variable on first usage. If the environment variable is not +// defined, then every call will panic. +type envLookup struct { + once sync.Once + panic error + key string + value string +} + +var _ EnvLookup = (*envLookup)(nil) + +func newEnvLookup(key string) EnvLookup { + return &envLookup{ + key: key, + } +} + +// GetValue implements EnvLookup.GetValue. +func (e *envLookup) GetValue() string { + e.once.Do(func() { + value, defined := os.LookupEnv(e.key) + if !defined { + e.panic = fmt.Errorf("required environment variable %q not defined", e.key) + } else { + e.value = value + } + }) + if e.panic != nil { + panic(e.panic) + } + return e.value +} diff --git a/pkg/reconciler/cronjobsource/env_lookup_test.go b/pkg/reconciler/cronjobsource/env_lookup_test.go new file mode 100644 index 00000000000..7a5d66536af --- /dev/null +++ b/pkg/reconciler/cronjobsource/env_lookup_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Veroute.on 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjobsource + +import ( + "fmt" + "os" + "testing" + + "github.com/google/uuid" +) + +// FinishedEnvLookup creates an envLookup that will return the specified value. +func FinishedEnvLookup(value string) EnvLookup { + e := &envLookup{ + value: value, + } + e.once.Do(func() {}) + return e +} + +// PanickingEnvLookup creates an envLookup that will panic with the specified error. +func PanickingEnvLookup(err error) EnvLookup { + e := &envLookup{ + panic: err, + } + e.once.Do(func() {}) + return e +} + +func TestEnvLookup_GetValue(t *testing.T) { + testCases := map[string]struct { + setEnv bool + value string + }{ + "not set": {}, + "empty value": { + setEnv: true, + }, + "real value": { + setEnv: true, + value: "real value", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + defer func() { + r := recover() + if tc.setEnv { + if r != nil { + t.Errorf("Panic detected when the env should have been set: %v", r) + } + } else { + if r == nil { + t.Errorf("Panic not detected when the env should not have been set") + } + } + }() + k := fmt.Sprintf("random-key-%s", uuid.New().String()) + if tc.setEnv { + if err := os.Setenv(k, tc.value); err != nil { + t.Fatalf("Error setting env %q: %v", k, err) + } + defer func() { + if err := os.Unsetenv(k); err != nil { + t.Fatalf("Error unsetting env %q: %v", k, err) + } + }() + } + e := newEnvLookup(k) + if v := e.GetValue(); v != tc.value { + t.Errorf("Unexpected value. Expected %q. Actually %q", tc.value, v) + } + }) + } +} + +func TestEnvLookup_GetValue_RepeatedPanics(t *testing.T) { + k := fmt.Sprintf("random-key-%s", uuid.New().String()) + e := newEnvLookup(k) + for i := 0; i < 10; i++ { + // Run inside a function to capture the panic. + func() { + defer func() { + r := recover() + if r == nil { + t.Fatalf("Expected a non-nil panic. Was nil at iteration %d", i) + } + }() + e.GetValue() + }() + } +} diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter.go b/pkg/reconciler/cronjobsource/resources/receive_adapter.go index 62815951c9b..f0a08e3e726 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter.go @@ -29,6 +29,11 @@ import ( "github.com/knative/eventing/pkg/apis/sources/v1alpha1" ) +var ( + // one is a form of int32(1) that you can take the address of. + one = int32(1) +) + // ReceiveAdapterArgs are the arguments needed to create a Cron Job Source Receive Adapter. Every // field is required. type ReceiveAdapterArgs struct { @@ -41,8 +46,6 @@ type ReceiveAdapterArgs struct { // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for // Cron Job Sources. func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { - replicas := int32(1) - RequestResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceCPU) if err != nil { RequestResourceCPU = resource.MustParse("250m") @@ -84,7 +87,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Selector: &metav1.LabelSelector{ MatchLabels: args.Labels, }, - Replicas: &replicas, + Replicas: &one, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: args.Labels, From 3c74e78eab2c5a93429c0af3bec2990f77aa7fc5 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 10:04:05 -0700 Subject: [PATCH 02/14] Replace EnvLookup with EnvConfig. --- pkg/reconciler/cronjobsource/controller.go | 22 ++-- .../cronjobsource/controller_test.go | 38 ++++++- pkg/reconciler/cronjobsource/cronjobsource.go | 8 +- .../cronjobsource/cronjobsource_test.go | 12 +- pkg/reconciler/cronjobsource/env_lookup.go | 64 ----------- .../cronjobsource/env_lookup_test.go | 107 ------------------ 6 files changed, 61 insertions(+), 190 deletions(-) delete mode 100644 pkg/reconciler/cronjobsource/env_lookup.go delete mode 100644 pkg/reconciler/cronjobsource/env_lookup_test.go diff --git a/pkg/reconciler/cronjobsource/controller.go b/pkg/reconciler/cronjobsource/controller.go index b333eee2ce1..b25e50552f6 100644 --- a/pkg/reconciler/cronjobsource/controller.go +++ b/pkg/reconciler/cronjobsource/controller.go @@ -18,16 +18,17 @@ package cronjobsource import ( "context" + "fmt" + "github.com/kelseyhightower/envconfig" "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + eventtypeinformer "github.com/knative/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" + cronjobsourceinformer "github.com/knative/eventing/pkg/client/injection/informers/sources/v1alpha1/cronjobsource" "github.com/knative/eventing/pkg/duck" "github.com/knative/eventing/pkg/reconciler" "k8s.io/client-go/tools/cache" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" - - eventtypeinformer "github.com/knative/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" - cronjobsourceinformer "github.com/knative/eventing/pkg/client/injection/informers/sources/v1alpha1/cronjobsource" deploymentinformer "knative.dev/pkg/injection/informers/kubeinformers/appsv1/deployment" ) @@ -50,12 +51,17 @@ func NewController( cronJobSourceInformer := cronjobsourceinformer.Get(ctx) eventTypeInformer := eventtypeinformer.Get(ctx) + env := &env{} + if err := envconfig.Process("", env); err != nil { + panic(fmt.Errorf("unable to process CronJobSource's required environment variables: %v", err)) + } + r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - cronjobLister: cronJobSourceInformer.Lister(), - deploymentLister: deploymentInformer.Lister(), - eventTypeLister: eventTypeInformer.Lister(), - receiveAdapterImage: newEnvLookup(raImageEnvVar), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + cronjobLister: cronJobSourceInformer.Lister(), + deploymentLister: deploymentInformer.Lister(), + eventTypeLister: eventTypeInformer.Lister(), + env: *env, } impl := controller.NewImpl(r, r.Logger, ReconcilerName) r.sinkReconciler = duck.NewInjectionSinkReconciler(ctx, impl.EnqueueKey) diff --git a/pkg/reconciler/cronjobsource/controller_test.go b/pkg/reconciler/cronjobsource/controller_test.go index 41a7a9fa46c..c436dba6e51 100644 --- a/pkg/reconciler/cronjobsource/controller_test.go +++ b/pkg/reconciler/cronjobsource/controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package cronjobsource import ( + "os" "testing" "knative.dev/pkg/configmap" @@ -30,12 +31,41 @@ import ( ) func TestNew(t *testing.T) { + testCases := map[string]struct { + setEnv bool + }{ + "image not set": {}, + "image set": { + setEnv: true, + }, + } defer logtesting.ClearAll() - ctx, _ := SetupFakeContext(t) + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + if tc.setEnv { + if err := os.Setenv("CRONJOB_RA_IMAGE", "anything"); err != nil { + t.Fatalf("Failed to set env var: %v", err) + } + defer func() { + if err := os.Unsetenv("CRONJOB_RA_IMAGE"); err != nil { + t.Fatalf("Failed to unset env var: %v", err) + } + }() + } else { + defer func() { + r := recover() + if r == nil { + t.Errorf("Expected NewController to panic, nothing recovered.") + } + }() + } - c := NewController(ctx, configmap.NewFixedWatcher()) + ctx, _ := SetupFakeContext(t) + c := NewController(ctx, configmap.NewFixedWatcher()) - if c == nil { - t.Fatal("Expected NewController to return a non-nil value") + if c == nil { + t.Fatal("Expected NewController to return a non-nil value") + } + }) } } diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index b7dd51931fa..1c4e5532539 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -60,7 +60,7 @@ const ( type Reconciler struct { *reconciler.Base - receiveAdapterImage EnvLookup + env env // listers index properties about resources cronjobLister listers.CronJobSourceLister @@ -73,6 +73,10 @@ type Reconciler struct { // Check that our Reconciler implements controller.Reconciler var _ controller.Reconciler = (*Reconciler)(nil) +type env struct { + Image string `envconfig:"CRONJOB_RA_IMAGE" required:"true"` +} + // Reconcile compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the CronJobSource // resource with the current status of the resource. @@ -212,7 +216,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro return nil, fmt.Errorf("getting receive adapter: %v", err) } adapterArgs := resources.ReceiveAdapterArgs{ - Image: r.receiveAdapterImage.GetValue(), + Image: r.env.Image, Source: src, Labels: resources.Labels(src.Name), SinkURI: sinkURI, diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index c40884c8a43..b499af6ce19 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -393,11 +393,13 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - cronjobLister: listers.GetCronJobSourceLister(), - deploymentLister: listers.GetDeploymentLister(), - eventTypeLister: listers.GetEventTypeLister(), - receiveAdapterImage: FinishedEnvLookup(image), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + cronjobLister: listers.GetCronJobSourceLister(), + deploymentLister: listers.GetDeploymentLister(), + eventTypeLister: listers.GetEventTypeLister(), + env: env{ + Image: image, + }, } r.sinkReconciler = duck.NewInjectionSinkReconciler(ctx, func(string) {}) return r diff --git a/pkg/reconciler/cronjobsource/env_lookup.go b/pkg/reconciler/cronjobsource/env_lookup.go deleted file mode 100644 index e7e8d8e15e5..00000000000 --- a/pkg/reconciler/cronjobsource/env_lookup.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cronjobsource - -import ( - "fmt" - "os" - "sync" -) - -// EnvLookup returns the value of the associated environment variable. It only does the lookup once. -// If the environment variable is undefined, then every call to GetValue() will panic. -type EnvLookup interface { - // GetValue returns the value of the associated environment variable or panic if the environment - // variable is not defined. - GetValue() string -} - -// envLookup looks up an environment variable on first usage. If the environment variable is not -// defined, then every call will panic. -type envLookup struct { - once sync.Once - panic error - key string - value string -} - -var _ EnvLookup = (*envLookup)(nil) - -func newEnvLookup(key string) EnvLookup { - return &envLookup{ - key: key, - } -} - -// GetValue implements EnvLookup.GetValue. -func (e *envLookup) GetValue() string { - e.once.Do(func() { - value, defined := os.LookupEnv(e.key) - if !defined { - e.panic = fmt.Errorf("required environment variable %q not defined", e.key) - } else { - e.value = value - } - }) - if e.panic != nil { - panic(e.panic) - } - return e.value -} diff --git a/pkg/reconciler/cronjobsource/env_lookup_test.go b/pkg/reconciler/cronjobsource/env_lookup_test.go deleted file mode 100644 index 7a5d66536af..00000000000 --- a/pkg/reconciler/cronjobsource/env_lookup_test.go +++ /dev/null @@ -1,107 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Veroute.on 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cronjobsource - -import ( - "fmt" - "os" - "testing" - - "github.com/google/uuid" -) - -// FinishedEnvLookup creates an envLookup that will return the specified value. -func FinishedEnvLookup(value string) EnvLookup { - e := &envLookup{ - value: value, - } - e.once.Do(func() {}) - return e -} - -// PanickingEnvLookup creates an envLookup that will panic with the specified error. -func PanickingEnvLookup(err error) EnvLookup { - e := &envLookup{ - panic: err, - } - e.once.Do(func() {}) - return e -} - -func TestEnvLookup_GetValue(t *testing.T) { - testCases := map[string]struct { - setEnv bool - value string - }{ - "not set": {}, - "empty value": { - setEnv: true, - }, - "real value": { - setEnv: true, - value: "real value", - }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - defer func() { - r := recover() - if tc.setEnv { - if r != nil { - t.Errorf("Panic detected when the env should have been set: %v", r) - } - } else { - if r == nil { - t.Errorf("Panic not detected when the env should not have been set") - } - } - }() - k := fmt.Sprintf("random-key-%s", uuid.New().String()) - if tc.setEnv { - if err := os.Setenv(k, tc.value); err != nil { - t.Fatalf("Error setting env %q: %v", k, err) - } - defer func() { - if err := os.Unsetenv(k); err != nil { - t.Fatalf("Error unsetting env %q: %v", k, err) - } - }() - } - e := newEnvLookup(k) - if v := e.GetValue(); v != tc.value { - t.Errorf("Unexpected value. Expected %q. Actually %q", tc.value, v) - } - }) - } -} - -func TestEnvLookup_GetValue_RepeatedPanics(t *testing.T) { - k := fmt.Sprintf("random-key-%s", uuid.New().String()) - e := newEnvLookup(k) - for i := 0; i < 10; i++ { - // Run inside a function to capture the panic. - func() { - defer func() { - r := recover() - if r == nil { - t.Fatalf("Expected a non-nil panic. Was nil at iteration %d", i) - } - }() - e.GetValue() - }() - } -} From 6586fdd36da6123152fb9fe8a4e528985510502d Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 12:01:14 -0700 Subject: [PATCH 03/14] Rename to envConfig. --- pkg/reconciler/cronjobsource/controller.go | 2 +- pkg/reconciler/cronjobsource/cronjobsource.go | 4 ++-- pkg/reconciler/cronjobsource/cronjobsource_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/reconciler/cronjobsource/controller.go b/pkg/reconciler/cronjobsource/controller.go index b25e50552f6..a548075c08e 100644 --- a/pkg/reconciler/cronjobsource/controller.go +++ b/pkg/reconciler/cronjobsource/controller.go @@ -51,7 +51,7 @@ func NewController( cronJobSourceInformer := cronjobsourceinformer.Get(ctx) eventTypeInformer := eventtypeinformer.Get(ctx) - env := &env{} + env := &envConfig{} if err := envconfig.Process("", env); err != nil { panic(fmt.Errorf("unable to process CronJobSource's required environment variables: %v", err)) } diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 1c4e5532539..e341357f5d0 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -60,7 +60,7 @@ const ( type Reconciler struct { *reconciler.Base - env env + env envConfig // listers index properties about resources cronjobLister listers.CronJobSourceLister @@ -73,7 +73,7 @@ type Reconciler struct { // Check that our Reconciler implements controller.Reconciler var _ controller.Reconciler = (*Reconciler)(nil) -type env struct { +type envConfig struct { Image string `envconfig:"CRONJOB_RA_IMAGE" required:"true"` } diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index b499af6ce19..36b4b222164 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -397,7 +397,7 @@ func TestAllCases(t *testing.T) { cronjobLister: listers.GetCronJobSourceLister(), deploymentLister: listers.GetDeploymentLister(), eventTypeLister: listers.GetEventTypeLister(), - env: env{ + env: envConfig{ Image: image, }, } From b706aa68a7a820f5ece59d2c4566ad85daa7e662 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 12:12:12 -0700 Subject: [PATCH 04/14] podSpecChanged is a helper method. --- pkg/reconciler/cronjobsource/cronjobsource.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index e341357f5d0..b6b767299e0 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -223,7 +223,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro } expected := resources.MakeReceiveAdapter(&adapterArgs) if ra != nil { - if r.podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) { + if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) { ra.Spec.Template.Spec = expected.Spec.Template.Spec if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil { return ra, fmt.Errorf("updating receive adapter: %v", err) @@ -242,7 +242,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro return ra, nil } -func (*Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { +func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { if !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) { return true } From 990aade5758eada4fe2ba240da889c105122ea1f Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 12:29:08 -0700 Subject: [PATCH 05/14] Move envConfig to controller.go. --- pkg/reconciler/cronjobsource/controller.go | 7 +++++++ pkg/reconciler/cronjobsource/cronjobsource.go | 6 +----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/reconciler/cronjobsource/controller.go b/pkg/reconciler/cronjobsource/controller.go index a548075c08e..c173e794e1b 100644 --- a/pkg/reconciler/cronjobsource/controller.go +++ b/pkg/reconciler/cronjobsource/controller.go @@ -40,6 +40,13 @@ const ( controllerAgentName = "cronjob-source-controller" ) +// envConfig will be used to extract the required environment variables using +// github.com/kelseyhightower/envconfig. If this configuration cannot be extracted, then +// NewController will panic. +type envConfig struct { + Image string `envconfig:"CRONJOB_RA_IMAGE" required:"true"` +} + // NewController initializes the controller and is called by the generated code // Registers event handlers to enqueue events func NewController( diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index b6b767299e0..7cc6b6e285d 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -73,10 +73,6 @@ type Reconciler struct { // Check that our Reconciler implements controller.Reconciler var _ controller.Reconciler = (*Reconciler)(nil) -type envConfig struct { - Image string `envconfig:"CRONJOB_RA_IMAGE" required:"true"` -} - // Reconcile compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the CronJobSource // resource with the current status of the resource. @@ -356,7 +352,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob r.Logger.Infof("CronJobSource %q became ready after %v", cronjob.Name, duration) r.Recorder.Event(cronjob, corev1.EventTypeNormal, cronJobReadinessChanged, fmt.Sprintf("CronJobSource %q became ready", cronjob.Name)) if recorderErr := r.StatsReporter.ReportReady("CronJobSource", cronjob.Namespace, cronjob.Name, duration); recorderErr != nil { - logging.FromContext(ctx).Info("Failed to record ready for CronJobSource", zap.Error(recorderErr)) + logging.FromContext(ctx).Error("Failed to record ready for CronJobSource", zap.Error(recorderErr)) } } From 9af6e457c6bb24f1cb67d7e8aa8a18637fd0f95a Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 2 Aug 2019 17:24:43 -0700 Subject: [PATCH 06/14] CronJobSource propagates its receive adapter's status. --- .../sources/v1alpha1/cron_job_lifecycle.go | 25 +++-- .../v1alpha1/cron_job_lifecycle_test.go | 97 ++++++++----------- pkg/reconciler/cronjobsource/cronjobsource.go | 4 +- .../cronjobsource/cronjobsource_test.go | 20 ++-- pkg/reconciler/testing/cronjobsource.go | 2 +- 5 files changed, 68 insertions(+), 80 deletions(-) diff --git a/pkg/apis/sources/v1alpha1/cron_job_lifecycle.go b/pkg/apis/sources/v1alpha1/cron_job_lifecycle.go index fca95d287f6..b8b2926da35 100644 --- a/pkg/apis/sources/v1alpha1/cron_job_lifecycle.go +++ b/pkg/apis/sources/v1alpha1/cron_job_lifecycle.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "github.com/knative/eventing/pkg/apis/duck" + appsv1 "k8s.io/api/apps/v1" "knative.dev/pkg/apis" ) @@ -87,19 +89,16 @@ func (s *CronJobSourceStatus) MarkNoSink(reason, messageFormat string, messageA cronJobSourceCondSet.Manage(s).MarkFalse(CronJobConditionSinkProvided, reason, messageFormat, messageA...) } -// MarkDeployed sets the condition that the source has been deployed. -func (s *CronJobSourceStatus) MarkDeployed() { - cronJobSourceCondSet.Manage(s).MarkTrue(CronJobConditionDeployed) -} - -// MarkDeploying sets the condition that the source is deploying. -func (s *CronJobSourceStatus) MarkDeploying(reason, messageFormat string, messageA ...interface{}) { - cronJobSourceCondSet.Manage(s).MarkUnknown(CronJobConditionDeployed, reason, messageFormat, messageA...) -} - -// MarkNotDeployed sets the condition that the source has not been deployed. -func (s *CronJobSourceStatus) MarkNotDeployed(reason, messageFormat string, messageA ...interface{}) { - cronJobSourceCondSet.Manage(s).MarkFalse(CronJobConditionDeployed, reason, messageFormat, messageA...) +// PropagateDeploymentAvailability uses the availability of the provided Deployment to determine if +// CronJobConditionDeployed should be marked as true or false. +func (s *CronJobSourceStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) { + if duck.DeploymentIsAvailable(&d.Status, false) { + apiserverCondSet.Manage(s).MarkTrue(CronJobConditionDeployed) + } else { + // I don't know how to propagate the status well, so just give the name of the Deployment + // for now. + apiserverCondSet.Manage(s).MarkFalse(CronJobConditionDeployed, "DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) + } } // MarkEventType sets the condition that the source has set its event type. diff --git a/pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go b/pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go index d7154e643ad..a6c3e99d4a6 100644 --- a/pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go +++ b/pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go @@ -19,6 +19,7 @@ package v1alpha1_test import ( "testing" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "github.com/google/go-cmp/cmp" @@ -27,6 +28,19 @@ import ( "knative.dev/pkg/apis" ) +var ( + availableDeployment = &appsv1.Deployment{ + Status: appsv1.DeploymentStatus{ + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionTrue, + }, + }, + }, + } +) + func TestCronJobSourceStatusIsReady(t *testing.T) { tests := []struct { name string @@ -49,7 +63,7 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s: func() *v1alpha1.CronJobSourceStatus { s := &v1alpha1.CronJobSourceStatus{} s.InitializeConditions() - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) return s }(), want: false, @@ -86,7 +100,8 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s := &v1alpha1.CronJobSourceStatus{} s.InitializeConditions() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) + s.PropagateDeploymentAvailability(availableDeployment) return s }(), want: false, @@ -107,7 +122,7 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) return s }(), want: true, @@ -118,7 +133,7 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) s.MarkEventType() return s }(), @@ -130,8 +145,8 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() - s.MarkNotDeployed("Testing", "") + s.PropagateDeploymentAvailability(availableDeployment) + s.PropagateDeploymentAvailability(&appsv1.Deployment{}) return s }(), want: false, @@ -142,24 +157,11 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) s.MarkNoEventType("Testing", "") return s }(), want: true, - }, { - name: "mark schedule, sink and not deployed then deploying then deployed", - s: func() *v1alpha1.CronJobSourceStatus { - s := &v1alpha1.CronJobSourceStatus{} - s.InitializeConditions() - s.MarkSchedule() - s.MarkSink("uri://example") - s.MarkNotDeployed("MarkNotDeployed", "") - s.MarkDeploying("MarkDeploying", "") - s.MarkDeployed() - return s - }(), - want: true, }, { name: "mark schedule validated, sink empty and deployed", s: func() *v1alpha1.CronJobSourceStatus { @@ -167,7 +169,7 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) return s }(), want: false, @@ -178,7 +180,7 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) s.MarkSink("uri://example") return s }(), @@ -223,7 +225,7 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s: func() *v1alpha1.CronJobSourceStatus { s := &v1alpha1.CronJobSourceStatus{} s.InitializeConditions() - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) return s }(), condQuery: v1alpha1.CronJobConditionReady, @@ -264,7 +266,7 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) return s }(), condQuery: v1alpha1.CronJobConditionReady, @@ -279,7 +281,7 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) s.MarkEventType() return s }(), @@ -295,7 +297,7 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) s.MarkNoSink("Testing", "hi%s", "") return s }(), @@ -313,7 +315,7 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) s.MarkInvalidSchedule("Testing", "hi%s", "") return s }(), @@ -331,16 +333,16 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() - s.MarkDeploying("Testing", "hi%s", "") + s.PropagateDeploymentAvailability(availableDeployment) + s.PropagateDeploymentAvailability(&appsv1.Deployment{}) return s }(), condQuery: v1alpha1.CronJobConditionReady, want: &apis.Condition{ Type: v1alpha1.CronJobConditionReady, - Status: corev1.ConditionUnknown, - Reason: "Testing", - Message: "hi", + Status: corev1.ConditionFalse, + Reason: "DeploymentUnavailable", + Message: "The Deployment '' is unavailable.", }, }, { name: "mark schedule, sink and deployed then not deployed", @@ -349,16 +351,16 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() - s.MarkNotDeployed("Testing", "hi%s", "") + s.PropagateDeploymentAvailability(availableDeployment) + s.PropagateDeploymentAvailability(&appsv1.Deployment{}) return s }(), condQuery: v1alpha1.CronJobConditionReady, want: &apis.Condition{ Type: v1alpha1.CronJobConditionReady, Status: corev1.ConditionFalse, - Reason: "Testing", - Message: "hi", + Reason: "DeploymentUnavailable", + Message: "The Deployment '' is unavailable.", }, }, { name: "mark schedule, sink, deployed and event types, then no event types", @@ -367,7 +369,7 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("uri://example") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) s.MarkEventType() s.MarkNoEventType("Testing", "hi") return s @@ -377,23 +379,6 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { Type: v1alpha1.CronJobConditionReady, Status: corev1.ConditionTrue, }, - }, { - name: "mark schedule, sink and not deployed then deploying then deployed", - s: func() *v1alpha1.CronJobSourceStatus { - s := &v1alpha1.CronJobSourceStatus{} - s.InitializeConditions() - s.MarkSchedule() - s.MarkSink("uri://example") - s.MarkNotDeployed("MarkNotDeployed", "%s", "") - s.MarkDeploying("MarkDeploying", "%s", "") - s.MarkDeployed() - return s - }(), - condQuery: v1alpha1.CronJobConditionReady, - want: &apis.Condition{ - Type: v1alpha1.CronJobConditionReady, - Status: corev1.ConditionTrue, - }, }, { name: "mark schedule, sink empty and deployed", s: func() *v1alpha1.CronJobSourceStatus { @@ -401,7 +386,7 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) return s }(), condQuery: v1alpha1.CronJobConditionReady, @@ -418,7 +403,7 @@ func TestCronJobSourceStatusGetCondition(t *testing.T) { s.InitializeConditions() s.MarkSchedule() s.MarkSink("") - s.MarkDeployed() + s.PropagateDeploymentAvailability(availableDeployment) s.MarkSink("uri://example") return s }(), diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 7cc6b6e285d..e36a5ad181f 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -153,12 +153,12 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou } cronjob.Status.MarkSink(sinkURI) - _, err = r.createReceiveAdapter(ctx, cronjob, sinkURI) + ra, err := r.createReceiveAdapter(ctx, cronjob, sinkURI) if err != nil { r.Logger.Error("Unable to create the receive adapter", zap.Error(err)) return fmt.Errorf("creating receive adapter: %v", err) } - cronjob.Status.MarkDeployed() + cronjob.Status.PropagateDeploymentAvailability(ra) _, err = r.reconcileEventType(ctx, cronjob) if err != nil { diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index 36b4b222164..0974904ba8e 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -162,6 +162,7 @@ func TestAllCases(t *testing.T) { WithInitChannelConditions, WithChannelAddress(sinkDNS), ), + makeAvailableReceiveAdapter(sinkRef), }, Key: testNS + "/" + sourceName, WantEvents: []string{ @@ -184,9 +185,6 @@ func TestAllCases(t *testing.T) { WithCronJobSourceEventType, ), }}, - WantCreates: []runtime.Object{ - makeReceiveAdapter(), - }, }, { Name: "valid with event type creation", Objects: []runtime.Object{ @@ -201,6 +199,7 @@ func TestAllCases(t *testing.T) { WithInitBrokerConditions, WithBrokerAddress(sinkDNS), ), + makeAvailableReceiveAdapter(brokerRef), }, Key: testNS + "/" + sourceName, WantEvents: []string{ @@ -231,7 +230,6 @@ func TestAllCases(t *testing.T) { WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), WithEventTypeBroker(sinkName), WithEventTypeOwnerReference(ownerRef)), - makeReceiveAdapterWithSink(brokerRef), }, }, { Name: "valid with event type deletion and creation", @@ -253,6 +251,7 @@ func TestAllCases(t *testing.T) { WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), WithEventTypeBroker(sinkName), WithEventTypeOwnerReference(ownerRef)), + makeAvailableReceiveAdapter(brokerRef), }, Key: testNS + "/" + sourceName, WantEvents: []string{ @@ -286,7 +285,6 @@ func TestAllCases(t *testing.T) { WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), WithEventTypeBroker(sinkName), WithEventTypeOwnerReference(ownerRef)), - makeReceiveAdapterWithSink(brokerRef), }, }, { Name: "valid, existing ra", @@ -302,7 +300,7 @@ func TestAllCases(t *testing.T) { WithInitChannelConditions, WithChannelAddress(sinkDNS), ), - makeReceiveAdapter(), + makeAvailableReceiveAdapter(sinkRef), }, Key: testNS + "/" + sourceName, WantEvents: []string{ @@ -345,7 +343,7 @@ func TestAllCases(t *testing.T) { WithInitChannelConditions, WithChannelAddress(sinkDNS), ), - makeReceiveAdapter(), + makeAvailableReceiveAdapter(sinkRef), }, Key: testNS + "/" + sourceName, WantEvents: []string{ @@ -372,7 +370,7 @@ func TestAllCases(t *testing.T) { WithInitChannelConditions, WithChannelAddress(sinkDNS), ), - makeReceiveAdapter(), + makeAvailableReceiveAdapter(sinkRef), NewEventType("name-1", testNS, WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), @@ -412,6 +410,12 @@ func makeReceiveAdapter() *appsv1.Deployment { return makeReceiveAdapterWithSink(sinkRef) } +func makeAvailableReceiveAdapter(ref corev1.ObjectReference) *appsv1.Deployment { + ra := makeReceiveAdapterWithSink(ref) + WithDeploymentAvailable()(ra) + return ra +} + func makeReceiveAdapterWithSink(ref corev1.ObjectReference) *appsv1.Deployment { source := NewCronSourceJob(sourceName, testNS, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ diff --git a/pkg/reconciler/testing/cronjobsource.go b/pkg/reconciler/testing/cronjobsource.go index 78f3ffbe1fc..5f976a540e5 100644 --- a/pkg/reconciler/testing/cronjobsource.go +++ b/pkg/reconciler/testing/cronjobsource.go @@ -66,7 +66,7 @@ func WithCronJobSourceSink(uri string) CronJobSourceOption { } func WithCronJobSourceDeployed(s *v1alpha1.CronJobSource) { - s.Status.MarkDeployed() + s.Status.PropagateDeploymentAvailability(NewDeployment("any", "any", WithDeploymentAvailable())) } func WithCronJobSourceEventType(s *v1alpha1.CronJobSource) { From 024f320377790bfe7de24f19052395f540914ee0 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 10:31:20 -0700 Subject: [PATCH 07/14] cronJobSourceCondSet, not apiServerCondSet. --- pkg/apis/sources/v1alpha1/cron_job_lifecycle.go | 4 ++-- pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/apis/sources/v1alpha1/cron_job_lifecycle.go b/pkg/apis/sources/v1alpha1/cron_job_lifecycle.go index b8b2926da35..9119a425275 100644 --- a/pkg/apis/sources/v1alpha1/cron_job_lifecycle.go +++ b/pkg/apis/sources/v1alpha1/cron_job_lifecycle.go @@ -93,11 +93,11 @@ func (s *CronJobSourceStatus) MarkNoSink(reason, messageFormat string, messageA // CronJobConditionDeployed should be marked as true or false. func (s *CronJobSourceStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) { if duck.DeploymentIsAvailable(&d.Status, false) { - apiserverCondSet.Manage(s).MarkTrue(CronJobConditionDeployed) + cronJobSourceCondSet.Manage(s).MarkTrue(CronJobConditionDeployed) } else { // I don't know how to propagate the status well, so just give the name of the Deployment // for now. - apiserverCondSet.Manage(s).MarkFalse(CronJobConditionDeployed, "DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) + cronJobSourceCondSet.Manage(s).MarkFalse(CronJobConditionDeployed, "DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) } } diff --git a/pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go b/pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go index a6c3e99d4a6..8c8db4b98db 100644 --- a/pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go +++ b/pkg/apis/sources/v1alpha1/cron_job_lifecycle_test.go @@ -101,7 +101,6 @@ func TestCronJobSourceStatusIsReady(t *testing.T) { s.InitializeConditions() s.MarkSink("uri://example") s.PropagateDeploymentAvailability(availableDeployment) - s.PropagateDeploymentAvailability(availableDeployment) return s }(), want: false, From 7dba0259a8567acde029c8be4e922318ff9eb97d Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 2 Aug 2019 16:02:54 -0700 Subject: [PATCH 08/14] Fixed name EventType. --- pkg/reconciler/cronjobsource/cronjobsource.go | 27 +++++++++++++------ .../cronjobsource/resources/eventtype.go | 8 +++--- .../cronjobsource/resources/labels.go | 11 +++++++- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index e36a5ad181f..348a7bd344c 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -167,6 +167,16 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou } cronjob.Status.MarkEventType() + // TODO Delete this after 0.8 is cut. + oldEventType, err := r.getOldEventType(ctx, cronjob) + if err != nil { + return fmt.Errorf("getting old event type: %v", err) + } else if oldEventType != nil { + if err = r.EventingClientSet.EventingV1alpha1().EventTypes(cronjob.Namespace).Delete(oldEventType.Name, nil); err != nil { + return fmt.Errorf("deleting old event type: %v", err) + } + } + return nil } @@ -255,7 +265,7 @@ func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource) (*appsv1.Deployment, error) { dl, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).List(metav1.ListOptions{ - LabelSelector: r.getLabelSelector(src).String(), + LabelSelector: r.getOldLabelSelector(src).String(), }) if err != nil { logging.FromContext(ctx).Error("Unable to list CronJobs: %v", zap.Error(err)) @@ -270,7 +280,8 @@ func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.CronJo } func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) { - current, err := r.getEventType(ctx, src) + expected := resources.MakeEventType(src) + current, err := r.eventTypeLister.EventTypes(src.Namespace).Get(expected.Name) if err != nil && !apierrors.IsNotFound(err) { logging.FromContext(ctx).Error("Unable to get an existing event type", zap.Error(err)) return nil, fmt.Errorf("getting event types: %v", err) @@ -289,7 +300,6 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ return nil, nil } - expected := resources.MakeEventType(src) if current != nil { if equality.Semantic.DeepEqual(expected.Spec, current.Spec) { return current, nil @@ -300,6 +310,7 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ return nil, fmt.Errorf("deleting event type: %v", err) } } + current, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(expected) if err != nil { logging.FromContext(ctx).Error("Error creating event type", zap.Any("eventType", current)) @@ -309,9 +320,9 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ return current, nil } -func (r *Reconciler) getEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) { +func (r *Reconciler) getOldEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) { etl, err := r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).List(metav1.ListOptions{ - LabelSelector: r.getLabelSelector(src).String(), + LabelSelector: r.getOldLabelSelector(src).String(), }) if err != nil { logging.FromContext(ctx).Error("Unable to list event types: %v", zap.Error(err)) @@ -322,11 +333,11 @@ func (r *Reconciler) getEventType(ctx context.Context, src *v1alpha1.CronJobSour return &et, nil } } - return nil, apierrors.NewNotFound(schema.GroupResource{}, "") + return nil, nil } -func (r *Reconciler) getLabelSelector(src *v1alpha1.CronJobSource) labels.Selector { - return labels.SelectorFromSet(resources.Labels(src.Name)) +func (r *Reconciler) getOldLabelSelector(src *v1alpha1.CronJobSource) labels.Selector { + return labels.SelectorFromSet(resources.OldLabels(src.Name)) } func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJobSource) (*v1alpha1.CronJobSource, error) { diff --git a/pkg/reconciler/cronjobsource/resources/eventtype.go b/pkg/reconciler/cronjobsource/resources/eventtype.go index 45d1f6e5660..dc6a86fc232 100644 --- a/pkg/reconciler/cronjobsource/resources/eventtype.go +++ b/pkg/reconciler/cronjobsource/resources/eventtype.go @@ -17,8 +17,6 @@ limitations under the License. package resources import ( - "fmt" - "knative.dev/pkg/kmeta" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -31,9 +29,9 @@ import ( func MakeEventType(src *v1alpha1.CronJobSource) *eventingv1alpha1.EventType { return &eventingv1alpha1.EventType{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: fmt.Sprintf("%s-", utils.ToDNS1123Subdomain(v1alpha1.CronJobEventType)), - Labels: Labels(src.Name), - Namespace: src.Namespace, + Name: utils.GenerateFixedName(src, utils.ToDNS1123Subdomain(v1alpha1.CronJobEventType)), + Labels: Labels(src.Name), + Namespace: src.Namespace, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(src), }, diff --git a/pkg/reconciler/cronjobsource/resources/labels.go b/pkg/reconciler/cronjobsource/resources/labels.go index 04dc6098fda..cd36a5933bb 100644 --- a/pkg/reconciler/cronjobsource/resources/labels.go +++ b/pkg/reconciler/cronjobsource/resources/labels.go @@ -22,9 +22,18 @@ const ( controllerAgentName = "cronjob-source-controller" ) -func Labels(name string) map[string]string { +// OldLabels are the pre-0.8 labels. +// TODO Delete after 0.8 is cut. +func OldLabels(name string) map[string]string { return map[string]string{ "knative-eventing-source": controllerAgentName, "knative-eventing-source-name": name, } } + +// Labels are the labels attached to all resources based on a CronJobSource. +func Labels(name string) map[string]string { + return map[string]string{ + "sources.eventing.knative.dev/cronJobSource": name, + } +} From 9a39910016775853cd545760c3ef95fc5870cc54 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 2 Aug 2019 16:10:00 -0700 Subject: [PATCH 09/14] Fixed name receive adapter. --- pkg/reconciler/cronjobsource/cronjobsource.go | 64 ++++++++++--------- .../resources/receive_adapter.go | 13 ++-- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 348a7bd344c..6a57b7085be 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -23,6 +23,7 @@ import ( "reflect" "time" + status "github.com/knative/eventing/pkg/apis/duck" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/apis/sources/v1alpha1" eventinglisters "github.com/knative/eventing/pkg/client/listers/eventing/v1alpha1" @@ -40,7 +41,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" appsv1listers "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/cache" "knative.dev/pkg/controller" @@ -160,6 +160,13 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou } cronjob.Status.PropagateDeploymentAvailability(ra) + // TODO Delete this after 0.8 is cut. + if status.DeploymentIsAvailable(&ra.Status, true) { + err = r.deleteOldReceiveAdapter(ctx, cronjob) + if err != nil { + return fmt.Errorf("deleting old receive adapter: %v", err) + } + } _, err = r.reconcileEventType(ctx, cronjob) if err != nil { cronjob.Status.MarkNoEventType("EventTypeReconcileFailed", "") @@ -211,16 +218,10 @@ func checkResourcesStatus(src *v1alpha1.CronJobSource) error { } func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource, sinkURI string) (*appsv1.Deployment, error) { - if err := checkResourcesStatus(src); err != nil { return nil, err } - ra, err := r.getReceiveAdapter(ctx, src) - if err != nil && !apierrors.IsNotFound(err) { - logging.FromContext(ctx).Error("Unable to get an existing receive adapter", zap.Error(err)) - return nil, fmt.Errorf("getting receive adapter: %v", err) - } adapterArgs := resources.ReceiveAdapterArgs{ Image: r.env.Image, Source: src, @@ -228,23 +229,26 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro SinkURI: sinkURI, } expected := resources.MakeReceiveAdapter(&adapterArgs) - if ra != nil { - if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) { - ra.Spec.Template.Spec = expected.Spec.Template.Spec - if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil { - return ra, fmt.Errorf("updating receive adapter: %v", err) - } - logging.FromContext(ctx).Info("Receive Adapter updated.", zap.Any("receiveAdapter", ra)) + + ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected) + r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentCreated, "Deployment created, error: %v", err) + return ra, err + } else if err != nil { + return nil, fmt.Errorf("error getting receive adapter: %v", err) + } else if !metav1.IsControlledBy(ra, src) { + return nil, fmt.Errorf("deployment %q is not owned by CronJobSource %q", ra.Name, src.Name) + } else if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) { + ra.Spec.Template.Spec = expected.Spec.Template.Spec + if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil { + return ra, err } - logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra)) + r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentUpdated, "Deployment %q updated", ra.Name) return ra, nil + } else { + logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra)) } - - ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected) - if err != nil { - return nil, fmt.Errorf("creating receive adapter: %v", err) - } - logging.FromContext(ctx).Info("Receive Adapter created.", zap.Any("receiveAdapter", ra)) return ra, nil } @@ -263,20 +267,22 @@ func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { return false } -func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource) (*appsv1.Deployment, error) { +func (r *Reconciler) deleteOldReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource) error { dl, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).List(metav1.ListOptions{ - LabelSelector: r.getOldLabelSelector(src).String(), + LabelSelector: labels.SelectorFromSet(resources.OldLabels(src.Name)).String(), }) if err != nil { - logging.FromContext(ctx).Error("Unable to list CronJobs: %v", zap.Error(err)) - return nil, fmt.Errorf("listing CronJobs: %v", err) + return fmt.Errorf("listing old receive adapter: %v", err) } - for _, dep := range dl.Items { - if metav1.IsControlledBy(&dep, src) { - return &dep, nil + for _, ora := range dl.Items { + if metav1.IsControlledBy(&ora, src) { + err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Delete(ora.Name, &metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("deleting old receive adapter %q: %v", ora.Name, err) + } } } - return nil, apierrors.NewNotFound(schema.GroupResource{}, "") + return nil } func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) { diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter.go b/pkg/reconciler/cronjobsource/resources/receive_adapter.go index f0a08e3e726..dd5533b56c1 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter.go @@ -19,14 +19,13 @@ package resources import ( "fmt" - "knative.dev/pkg/kmeta" - + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + "github.com/knative/eventing/pkg/utils" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + "knative.dev/pkg/kmeta" ) var ( @@ -76,9 +75,9 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { return &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Namespace: args.Source.Namespace, - GenerateName: fmt.Sprintf("cronjob-%s-", args.Source.Name), - Labels: args.Labels, + Namespace: args.Source.Namespace, + Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("cronjobsource-%s", args.Source.Name)), + Labels: args.Labels, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(args.Source), }, From 7a3976d07947a18aeec5127b80c9db9f9a8d2a44 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 2 Aug 2019 16:57:09 -0700 Subject: [PATCH 10/14] Fix unit tests. --- pkg/reconciler/cronjobsource/cronjobsource.go | 18 +++-- .../cronjobsource/cronjobsource_test.go | 8 ++- .../resources/receive_adapter_test.go | 66 +++---------------- 3 files changed, 26 insertions(+), 66 deletions(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 6a57b7085be..9efd20c9d18 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -48,9 +48,11 @@ import ( const ( // Name of the corev1.Events emitted from the reconciliation process - cronjobReconciled = "CronJobSourceReconciled" - cronJobReadinessChanged = "CronJobSourceReadinessChanged" - cronjobUpdateStatusFailed = "CronJobSourceUpdateStatusFailed" + cronJobReconciled = "CronJobSourceReconciled" + cronJobReadinessChanged = "CronJobSourceReadinessChanged" + cronJobUpdateStatusFailed = "CronJobSourceUpdateStatusFailed" + cronJobSourceDeploymentCreated = "CronJobSurceDeploymentCreated" + cronJobSourceDeploymentUpdated = "CronJobSourceDeploymentUpdated" // raImageEnvVar is the name of the environment variable that contains the receive adapter's // image. It must be defined. @@ -104,12 +106,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { logging.FromContext(ctx).Warn("Error reconciling CronJobSource", zap.Error(err)) } else { logging.FromContext(ctx).Debug("CronJobSource reconciled") - r.Recorder.Eventf(cronjob, corev1.EventTypeNormal, cronjobReconciled, `CronJobSource reconciled: "%s/%s"`, cronjob.Namespace, cronjob.Name) + r.Recorder.Eventf(cronjob, corev1.EventTypeNormal, cronJobReconciled, `CronJobSource reconciled: "%s/%s"`, cronjob.Namespace, cronjob.Name) } if _, updateStatusErr := r.updateStatus(ctx, cronjob.DeepCopy()); updateStatusErr != nil { logging.FromContext(ctx).Warn("Failed to update the CronJobSource", zap.Error(err)) - r.Recorder.Eventf(cronjob, corev1.EventTypeWarning, cronjobUpdateStatusFailed, "Failed to update CronJobSource's status: %v", err) + r.Recorder.Eventf(cronjob, corev1.EventTypeWarning, cronJobUpdateStatusFailed, "Failed to update CronJobSource's status: %v", err) return updateStatusErr } @@ -233,7 +235,11 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected) - r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentCreated, "Deployment created, error: %v", err) + msg := "Deployment created" + if err != nil { + msg = fmt.Sprintf("Deployment created, error: %v", err) + } + r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentCreated, "%s", msg) return ra, err } else if err != nil { return nil, fmt.Errorf("error getting receive adapter: %v", err) diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index 7cfaad0d3b2..dc04e427629 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -65,11 +65,13 @@ var ( Controller: &trueVal, BlockOwnerDeletion: &trueVal, } + eventTypeName = fmt.Sprintf("dev.knative.cronjob.event-%s", sourceUID) ) const ( image = "github.com/knative/test/image" sourceName = "test-cronjob-source" + sourceUID = "1234" testNS = "testnamespace" testSchedule = "*/2 * * * *" testData = "data" @@ -223,7 +225,7 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewEventType("", testNS, + NewEventType(eventTypeName, testNS, WithEventTypeGenerateName(fmt.Sprintf("%s-", utils.ToDNS1123Subdomain(sourcesv1alpha1.CronJobEventType))), WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), @@ -245,7 +247,7 @@ func TestAllCases(t *testing.T) { WithInitBrokerConditions, WithBrokerAddress(sinkDNS), ), - NewEventType("name-1", testNS, + NewEventType(eventTypeName, testNS, WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType("type-1"), WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), @@ -278,7 +280,7 @@ func TestAllCases(t *testing.T) { Name: "name-1", }}, WantCreates: []runtime.Object{ - NewEventType("", testNS, + NewEventType(eventTypeName, testNS, WithEventTypeGenerateName(fmt.Sprintf("%s-", utils.ToDNS1123Subdomain(sourcesv1alpha1.CronJobEventType))), WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go b/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go index be84a26ae70..7ad9c9ca7c6 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" v1 "k8s.io/api/apps/v1" @@ -33,6 +34,7 @@ func TestMakeReceiveAdapter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "source-name", Namespace: "source-namespace", + UID: "source-uid", }, Spec: v1alpha1.CronJobSourceSpec{ ServiceAccountName: "source-svc-acct", @@ -55,8 +57,8 @@ func TestMakeReceiveAdapter(t *testing.T) { yes := true want := &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "source-namespace", - GenerateName: "cronjob-source-name-", + Namespace: "source-namespace", + Name: fmt.Sprintf("cronjobsource-%s-%s", src.Name, src.UID), Labels: map[string]string{ "test-key1": "test-value1", "test-key2": "test-value2", @@ -64,7 +66,8 @@ func TestMakeReceiveAdapter(t *testing.T) { OwnerReferences: []metav1.OwnerReference{{ APIVersion: "sources.eventing.knative.dev/v1alpha1", Kind: "CronJobSource", - Name: "source-name", + Name: src.Name, + UID: src.UID, Controller: &yes, BlockOwnerDeletion: &yes, }}, @@ -130,59 +133,8 @@ func TestMakeReceiveAdapter(t *testing.T) { } if diff, err := kmp.SafeDiff(want, got); err != nil { - t.Errorf("unexpected cron job (-want, +got) = %v", diff) + t.Errorf("unexpected cron job resources (-want, +got) = %v", err) + } else if diff != "" { + t.Errorf("Unexpected deployment (-want +got) = %v", diff) } - src.Spec.Resources = v1alpha1.CronJobResourceSpec{ - Requests: v1alpha1.CronJobRequestsSpec{ - ResourceCPU: "101m", - ResourceMemory: "200Mi", - }, - Limits: v1alpha1.CronJobLimitsSpec{ - ResourceCPU: "102m", - ResourceMemory: "500Mi", - }, - } - want.Spec.Template.Spec.Containers = []corev1.Container{ - { - Name: "receive-adapter", - Image: "test-image", - Env: []corev1.EnvVar{ - { - Name: "SCHEDULE", - Value: "*/2 * * * *", - }, - { - Name: "DATA", - Value: "data", - }, - { - Name: "SINK_URI", - Value: "sink-uri", - }, - { - Name: "NAME", - Value: "source-name", - }, - { - Name: "NAMESPACE", - Value: "source-namespace", - }, - }, - Resources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("101m"), - corev1.ResourceMemory: resource.MustParse("200Mi"), - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("102m"), - corev1.ResourceMemory: resource.MustParse("500Mi"), - }, - }, - }, - } - - if diff, err := kmp.SafeDiff(want, got); err != nil { - t.Errorf("unexpected cron job resources (-want, +got) = %v", diff) - } - } From 148c7359dd60da834cd93736fc356ff1e1902e1c Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 11:27:50 -0700 Subject: [PATCH 11/14] Log the errors. --- pkg/reconciler/cronjobsource/cronjobsource.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 9efd20c9d18..53113b93c97 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -157,7 +157,7 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou ra, err := r.createReceiveAdapter(ctx, cronjob, sinkURI) if err != nil { - r.Logger.Error("Unable to create the receive adapter", zap.Error(err)) + logging.FromContext(ctx).Error("Unable to create the receive adapter", zap.Error(err)) return fmt.Errorf("creating receive adapter: %v", err) } cronjob.Status.PropagateDeploymentAvailability(ra) @@ -304,7 +304,7 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ if src.Spec.Sink.Kind != "Broker" { if current != nil { if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { - logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current)) + logging.FromContext(ctx).Error("Error deleting existing event type", zap.Error(err), zap.Any("eventType", current)) return nil, fmt.Errorf("deleting event type: %v", err) } } @@ -318,14 +318,14 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ } // EventTypes are immutable, delete it and create it again. if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil { - logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current)) + logging.FromContext(ctx).Error("Error deleting existing event type", zap.Error(err), zap.Any("eventType", current)) return nil, fmt.Errorf("deleting event type: %v", err) } } current, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(expected) if err != nil { - logging.FromContext(ctx).Error("Error creating event type", zap.Any("eventType", current)) + logging.FromContext(ctx).Error("Error creating event type", zap.Error(err), zap.Any("eventType", expected)) return nil, fmt.Errorf("creating event type: %v", err) } logging.FromContext(ctx).Debug("EventType created", zap.Any("eventType", current)) @@ -372,7 +372,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob cj, err := r.EventingClientSet.SourcesV1alpha1().CronJobSources(desired.Namespace).UpdateStatus(existing) if err == nil && becomesReady { duration := time.Since(cj.ObjectMeta.CreationTimestamp.Time) - r.Logger.Infof("CronJobSource %q became ready after %v", cronjob.Name, duration) + logging.FromContext(ctx).Info("CronJobSource because ready after", zap.Duration("duration", duration)) r.Recorder.Event(cronjob, corev1.EventTypeNormal, cronJobReadinessChanged, fmt.Sprintf("CronJobSource %q became ready", cronjob.Name)) if recorderErr := r.StatsReporter.ReportReady("CronJobSource", cronjob.Namespace, cronjob.Name, duration); recorderErr != nil { logging.FromContext(ctx).Error("Failed to record ready for CronJobSource", zap.Error(recorderErr)) From 573b3b6f902b911a5785200e509fff231e88b58a Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 11:50:07 -0700 Subject: [PATCH 12/14] Fix unit tests. --- .../cronjobsource/cronjobsource_test.go | 79 +++++++++++++------ pkg/reconciler/testing/cronjobsource.go | 13 +-- pkg/reconciler/testing/eventtype.go | 6 -- 3 files changed, 62 insertions(+), 36 deletions(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index dc04e427629..d3530142375 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -62,6 +62,7 @@ var ( APIVersion: "sources.eventing.knative.dev/v1alpha1", Kind: "CronJobSource", Name: sourceName, + UID: sourceUID, Controller: &trueVal, BlockOwnerDeletion: &trueVal, } @@ -99,7 +100,7 @@ func TestAllCases(t *testing.T) { }, { Name: "invalid schedule", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: "invalid schedule", Data: testData, @@ -113,7 +114,7 @@ func TestAllCases(t *testing.T) { // Eventf(corev1.EventTypeWarning, "Fail", ""), // TODO: BUGBUGBUG This should make an event. //}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: "invalid schedule", Data: testData, @@ -127,7 +128,7 @@ func TestAllCases(t *testing.T) { }, { Name: "missing sink", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -138,7 +139,7 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -153,7 +154,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -172,7 +173,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReadinessChanged", `CronJobSource %q became ready`, sourceName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -190,7 +191,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid with event type creation", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -209,7 +210,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReadinessChanged", `CronJobSource %q became ready`, sourceName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -226,7 +227,6 @@ func TestAllCases(t *testing.T) { }}, WantCreates: []runtime.Object{ NewEventType(eventTypeName, testNS, - WithEventTypeGenerateName(fmt.Sprintf("%s-", utils.ToDNS1123Subdomain(sourcesv1alpha1.CronJobEventType))), WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), @@ -236,7 +236,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid with event type deletion and creation", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -261,7 +261,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReadinessChanged", `CronJobSource %q became ready`, sourceName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -277,11 +277,10 @@ func TestAllCases(t *testing.T) { ), }}, WantDeletes: []clientgotesting.DeleteActionImpl{{ - Name: "name-1", + Name: eventTypeName, }}, WantCreates: []runtime.Object{ NewEventType(eventTypeName, testNS, - WithEventTypeGenerateName(fmt.Sprintf("%s-", utils.ToDNS1123Subdomain(sourcesv1alpha1.CronJobEventType))), WithEventTypeLabels(resources.Labels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), @@ -291,7 +290,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid, existing ra", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -310,7 +309,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReadinessChanged", `CronJobSource %q became ready`, sourceName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewCronSourceJob(sourceName, testNS, + Object: NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -328,7 +327,7 @@ func TestAllCases(t *testing.T) { }, { Name: "valid, no change", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -352,9 +351,9 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "CronJobSourceReconciled", `CronJobSource reconciled: "%s/%s"`, testNS, sourceName), }, }, { - Name: "valid with event type deletion", + Name: "valid with old event type deletion", Objects: []runtime.Object{ - NewCronSourceJob(sourceName, testNS, + NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, @@ -374,7 +373,7 @@ func TestAllCases(t *testing.T) { ), makeAvailableReceiveAdapter(sinkRef), NewEventType("name-1", testNS, - WithEventTypeLabels(resources.Labels(sourceName)), + WithEventTypeLabels(resources.OldLabels(sourceName)), WithEventTypeType(sourcesv1alpha1.CronJobEventType), WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), WithEventTypeBroker(sinkName), @@ -387,6 +386,42 @@ func TestAllCases(t *testing.T) { WantDeletes: []clientgotesting.DeleteActionImpl{{ Name: "name-1", }}, + }, { + Name: "valid with event type deletion", + Objects: []runtime.Object{ + NewCronJobSource(sourceName, testNS, sourceUID, + WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ + Schedule: testSchedule, + Data: testData, + Sink: &sinkRef, + }), + WithInitCronJobSourceConditions, + WithValidCronJobSourceSchedule, + WithValidCronJobSourceResources, + WithValidCronJobSourceResources, + WithCronJobSourceDeployed, + WithCronJobSourceSink(sinkURI), + WithCronJobSourceEventType, + ), + NewChannel(sinkName, testNS, + WithInitChannelConditions, + WithChannelAddress(sinkDNS), + ), + makeAvailableReceiveAdapter(sinkRef), + NewEventType(eventTypeName, testNS, + WithEventTypeLabels(resources.Labels(sourceName)), + WithEventTypeType(sourcesv1alpha1.CronJobEventType), + WithEventTypeSource(sourcesv1alpha1.CronJobEventSource(testNS, sourceName)), + WithEventTypeBroker(sinkName), + WithEventTypeOwnerReference(ownerRef)), + }, + Key: testNS + "/" + sourceName, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "CronJobSourceReconciled", `CronJobSource reconciled: "%s/%s"`, testNS, sourceName), + }, + WantDeletes: []clientgotesting.DeleteActionImpl{{ + Name: eventTypeName, + }}, }, } @@ -408,10 +443,6 @@ func TestAllCases(t *testing.T) { )) } -func makeReceiveAdapter() *appsv1.Deployment { - return makeReceiveAdapterWithSink(sinkRef) -} - func makeAvailableReceiveAdapter(ref corev1.ObjectReference) *appsv1.Deployment { ra := makeReceiveAdapterWithSink(ref) WithDeploymentAvailable()(ra) @@ -419,7 +450,7 @@ func makeAvailableReceiveAdapter(ref corev1.ObjectReference) *appsv1.Deployment } func makeReceiveAdapterWithSink(ref corev1.ObjectReference) *appsv1.Deployment { - source := NewCronSourceJob(sourceName, testNS, + source := NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, Data: testData, diff --git a/pkg/reconciler/testing/cronjobsource.go b/pkg/reconciler/testing/cronjobsource.go index 5f976a540e5..a4ed9bd13c7 100644 --- a/pkg/reconciler/testing/cronjobsource.go +++ b/pkg/reconciler/testing/cronjobsource.go @@ -19,30 +19,31 @@ package testing import ( "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) // CronJobSourceOption enables further configuration of a CronJob. type CronJobSourceOption func(*v1alpha1.CronJobSource) -// NewCronJob creates a CronJob with CronJobOptions -func NewCronSourceJob(name, namespace string, o ...CronJobSourceOption) *v1alpha1.CronJobSource { +// NewCronJobSource creates a CronJobSource with CronJobOptions. +func NewCronJobSource(name, namespace, uid string, o ...CronJobSourceOption) *v1alpha1.CronJobSource { c := &v1alpha1.CronJobSource{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + UID: types.UID(uid), }, } for _, opt := range o { opt(c) } - //c.SetDefaults(context.Background()) // TODO: We should add defaults and validation. + // c.SetDefaults(context.Background()) // TODO: We should add defaults and validation. return c } -// WithInitCronJobConditions initializes the CronJobSource's conditions. +// WithInitCronJobSourceConditions initializes the CronJobSource's conditions. func WithInitCronJobSourceConditions(s *v1alpha1.CronJobSource) { s.Status.InitializeConditions() } diff --git a/pkg/reconciler/testing/eventtype.go b/pkg/reconciler/testing/eventtype.go index 0d31841a7af..521a625ffc0 100644 --- a/pkg/reconciler/testing/eventtype.go +++ b/pkg/reconciler/testing/eventtype.go @@ -47,12 +47,6 @@ func WithInitEventTypeConditions(et *v1alpha1.EventType) { et.Status.InitializeConditions() } -func WithEventTypeGenerateName(generateName string) EventTypeOption { - return func(et *v1alpha1.EventType) { - et.ObjectMeta.GenerateName = generateName - } -} - func WithEventTypeSource(source string) EventTypeOption { return func(et *v1alpha1.EventType) { et.Spec.Source = source From 1b1df855a1daea14a2cbcfd5758a97608950ddbf Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 15:23:27 -0700 Subject: [PATCH 13/14] Remove duplicate function. --- pkg/reconciler/cronjobsource/cronjobsource_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index d3015864d11..d3530142375 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -449,12 +449,6 @@ func makeAvailableReceiveAdapter(ref corev1.ObjectReference) *appsv1.Deployment return ra } -func makeAvailableReceiveAdapter(ref corev1.ObjectReference) *appsv1.Deployment { - ra := makeReceiveAdapterWithSink(ref) - WithDeploymentAvailable()(ra) - return ra -} - func makeReceiveAdapterWithSink(ref corev1.ObjectReference) *appsv1.Deployment { source := NewCronJobSource(sourceName, testNS, sourceUID, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ From e4fad4a8306baf512737dec0469036a25121531d Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 5 Aug 2019 16:36:34 -0700 Subject: [PATCH 14/14] PR comments --- pkg/reconciler/cronjobsource/cronjobsource.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 53113b93c97..aba5f0dfb30 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -181,7 +181,7 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou if err != nil { return fmt.Errorf("getting old event type: %v", err) } else if oldEventType != nil { - if err = r.EventingClientSet.EventingV1alpha1().EventTypes(cronjob.Namespace).Delete(oldEventType.Name, nil); err != nil { + if err = r.EventingClientSet.EventingV1alpha1().EventTypes(cronjob.Namespace).Delete(oldEventType.Name, &metav1.DeleteOptions{}); err != nil { return fmt.Errorf("deleting old event type: %v", err) } } @@ -372,7 +372,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob cj, err := r.EventingClientSet.SourcesV1alpha1().CronJobSources(desired.Namespace).UpdateStatus(existing) if err == nil && becomesReady { duration := time.Since(cj.ObjectMeta.CreationTimestamp.Time) - logging.FromContext(ctx).Info("CronJobSource because ready after", zap.Duration("duration", duration)) + logging.FromContext(ctx).Info("CronJobSource became ready after", zap.Duration("duration", duration)) r.Recorder.Event(cronjob, corev1.EventTypeNormal, cronJobReadinessChanged, fmt.Sprintf("CronJobSource %q became ready", cronjob.Name)) if recorderErr := r.StatsReporter.ReportReady("CronJobSource", cronjob.Namespace, cronjob.Name, duration); recorderErr != nil { logging.FromContext(ctx).Error("Failed to record ready for CronJobSource", zap.Error(recorderErr))