From ab7623fb4f0e4298657da53be006056e3b8f8f9f Mon Sep 17 00:00:00 2001 From: Grant Rodgers Date: Wed, 1 Aug 2018 17:19:06 -0700 Subject: [PATCH] wip: context args --- pkg/controller/feed/reconcile.go | 103 ++++++++++++++++--------------- 1 file changed, 53 insertions(+), 50 deletions(-) diff --git a/pkg/controller/feed/reconcile.go b/pkg/controller/feed/reconcile.go index 714101fb668..e1eeafcb5bb 100644 --- a/pkg/controller/feed/reconcile.go +++ b/pkg/controller/feed/reconcile.go @@ -46,8 +46,11 @@ const ( // its current state. // If Reconcile returns a non-nil error, the request will be retried. func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + //TODO use this to store the logger and set a deadline + ctx := context.TODO() + feed := &feedsv1alpha1.Feed{} - err := r.client.Get(context.TODO(), request.NamespacedName, feed) + err := r.client.Get(ctx, request.NamespacedName, feed) // The feed may have been deleted since it was added to the workqueue. If so // there's nothing to be done. @@ -69,14 +72,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // state, it means no more can be done for now. In this case the feed will // not be reconciled again until the resync period or a watched resource // changes. - if err = r.reconcile(feed); err != nil { + if err = r.reconcile(ctx, feed); err != nil { glog.Errorf("error reconciling Feed: %v", err) } // Since the reconcile is a sequence of steps, earlier steps may complete // successfully while later steps fail. The Feed is updated on failure to // preserve any useful status or metadata changes the non-failing steps made. - if updateErr := r.updateFeed(feed); updateErr != nil { + if updateErr := r.updateFeed(ctx, feed); updateErr != nil { glog.Errorf("failed to update Feed: %v", updateErr) // An error here means the feed should be reconciled again, regardless of // whether the reconcile was successful or not. @@ -88,12 +91,12 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // reconcile tries to converge the current state of the given Feed to the // desired state. This function should not update the Feed; the calling method // should do that. -func (r *reconciler) reconcile(feed *feedsv1alpha1.Feed) error { +func (r *reconciler) reconcile(ctx context.Context, feed *feedsv1alpha1.Feed) error { feed.Status.InitializeConditions() // Fetch the EventSource and EventType that is being asked for // and if they don't exist update the status to reflect this back // to the user. - eventSource, eventType, err := r.getFeedSource(feed) + eventSource, eventType, err := r.getFeedSource(ctx, feed) if err != nil { switch t := err.(type) { case *EventSourceError: @@ -133,15 +136,15 @@ func (r *reconciler) reconcile(feed *feedsv1alpha1.Feed) error { // TODO: Remove this and use finalizers in the EventTypes / EventSources // to do this properly. // TODO: Add issue link here. can't look up right now, no wifi - r.setEventTypeOwnerReference(feed) + r.setEventTypeOwnerReference(ctx, feed) if feed.GetDeletionTimestamp() == nil { - err = r.reconcileStartJob(feed, eventSource, eventType) + err = r.reconcileStartJob(ctx, feed, eventSource, eventType) if err != nil { glog.Errorf("error reconciling start Job: %v", err) } } else { - err = r.reconcileStopJob(feed, eventSource, eventType) + err = r.reconcileStopJob(ctx, feed, eventSource, eventType) if err != nil { glog.Errorf("error reconciling stop Job: %v", err) } @@ -149,7 +152,7 @@ func (r *reconciler) reconcile(feed *feedsv1alpha1.Feed) error { return nil } -func (r *reconciler) reconcileStartJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) error { +func (r *reconciler) reconcileStartJob(ctx context.Context, feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) error { bc := feed.Status.GetCondition(feedsv1alpha1.FeedConditionReady) switch bc.Status { case corev1.ConditionUnknown: @@ -157,9 +160,9 @@ func (r *reconciler) reconcileStartJob(feed *feedsv1alpha1.Feed, es *feedsv1alph job := &batchv1.Job{} jobName := resources.JobName(feed) - if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job); err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job); err != nil { if errors.IsNotFound(err) { - job, err = r.createJob(feed, es, et) + job, err = r.createJob(ctx, feed, es, et) if err != nil { return err } @@ -176,7 +179,7 @@ func (r *reconciler) reconcileStartJob(feed *feedsv1alpha1.Feed, es *feedsv1alph if resources.IsJobComplete(job) { r.recorder.Eventf(feed, corev1.EventTypeNormal, "StartJobCompleted", "Start job %q completed", job.Name) - if err := r.setFeedContext(feed, job); err != nil { + if err := r.setFeedContext(ctx, feed, job); err != nil { return err } feed.Status.SetCondition(&feedsv1alpha1.FeedCondition{ @@ -201,14 +204,14 @@ func (r *reconciler) reconcileStartJob(feed *feedsv1alpha1.Feed, es *feedsv1alph return nil } -func (r *reconciler) reconcileStopJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) error { +func (r *reconciler) reconcileStopJob(ctx context.Context, feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) error { if feed.HasFinalizer(finalizerName) { // check for an existing start Job job := &batchv1.Job{} jobName := resources.StartJobName(feed) - err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job) + err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job) if err != nil && !errors.IsNotFound(err) { return err } @@ -219,19 +222,19 @@ func (r *reconciler) reconcileStopJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha // Need to delete pods first to workaround the client's lack of support // for cascading deletes. TODO remove this when client support allows. - if err = r.deleteJobPods(job); err != nil { + if err = r.deleteJobPods(ctx, job); err != nil { return err } - r.client.Delete(context.TODO(), job) + r.client.Delete(ctx, job) glog.Infof("Deleted start job: %s/%s", job.Namespace, job.Name) return nil } jobName = resources.JobName(feed) - if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job); err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job); err != nil { if errors.IsNotFound(err) { - job, err = r.createJob(feed, es, et) + job, err = r.createJob(ctx, feed, es, et) if err != nil { return err } @@ -285,9 +288,9 @@ func (r *reconciler) updateFinalizers(u *feedsv1alpha1.Feed) error { return nil } -func (r *reconciler) updateFeed(u *feedsv1alpha1.Feed) error { +func (r *reconciler) updateFeed(ctx context.Context, u *feedsv1alpha1.Feed) error { feed := &feedsv1alpha1.Feed{} - err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, feed) + err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, feed) if err != nil { return err } @@ -315,7 +318,7 @@ func (r *reconciler) updateFeed(u *feedsv1alpha1.Feed) error { // update the Status block of the Feed resource. UpdateStatus will not // allow changes to the Spec of the resource, which is ideal for ensuring // nothing other than resource status has been updated. - return r.client.Update(context.TODO(), feed) + return r.client.Update(ctx, feed) } func (r *reconciler) getEventTypeName(feed *feedsv1alpha1.Feed) string { @@ -327,19 +330,19 @@ func (r *reconciler) getEventTypeName(feed *feedsv1alpha1.Feed) string { return "" } -func (r *reconciler) setEventTypeOwnerReference(feed *feedsv1alpha1.Feed) error { +func (r *reconciler) setEventTypeOwnerReference(ctx context.Context, feed *feedsv1alpha1.Feed) error { // TODO(nicholss): need to set the owner on a cluser level event type as well. if len(feed.Spec.Trigger.EventType) > 0 { - return r.setNamespacedEventTypeOwnerReference(feed) + return r.setNamespacedEventTypeOwnerReference(ctx, feed) } else if len(feed.Spec.Trigger.ClusterEventType) > 0 { - return r.setClusterEventTypeOwnerReference(feed) + return r.setClusterEventTypeOwnerReference(ctx, feed) } return nil } -func (r *reconciler) setNamespacedEventTypeOwnerReference(feed *feedsv1alpha1.Feed) error { +func (r *reconciler) setNamespacedEventTypeOwnerReference(ctx context.Context, feed *feedsv1alpha1.Feed) error { et := &feedsv1alpha1.EventType{} - if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.EventType}, et); err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.EventType}, et); err != nil { if errors.IsNotFound(err) { glog.Errorf("Feed EventType not found, will not set finalizer") return nil @@ -357,9 +360,9 @@ func (r *reconciler) setNamespacedEventTypeOwnerReference(feed *feedsv1alpha1.Fe return nil } -func (r *reconciler) setClusterEventTypeOwnerReference(feed *feedsv1alpha1.Feed) error { +func (r *reconciler) setClusterEventTypeOwnerReference(ctx context.Context, feed *feedsv1alpha1.Feed) error { et := &feedsv1alpha1.ClusterEventType{} - if err := r.client.Get(context.TODO(), client.ObjectKey{Name: feed.Spec.Trigger.ClusterEventType}, et); err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Name: feed.Spec.Trigger.ClusterEventType}, et); err != nil { if errors.IsNotFound(err) { glog.Errorf("Feed ClusterEventType not found, will not set finalizer") return nil @@ -377,7 +380,7 @@ func (r *reconciler) setClusterEventTypeOwnerReference(feed *feedsv1alpha1.Feed) return nil } -func (r *reconciler) resolveTrigger(feed *feedsv1alpha1.Feed) (sources.EventTrigger, error) { +func (r *reconciler) resolveTrigger(ctx context.Context, feed *feedsv1alpha1.Feed) (sources.EventTrigger, error) { trigger := feed.Spec.Trigger resolved := sources.EventTrigger{ Resource: trigger.Resource, @@ -397,7 +400,7 @@ func (r *reconciler) resolveTrigger(feed *feedsv1alpha1.Feed) (sources.EventTrig if trigger.ParametersFrom != nil { glog.Infof("fetching from source %+v", trigger.ParametersFrom) for _, p := range trigger.ParametersFrom { - pfs, err := r.fetchParametersFromSource(feed.Namespace, &p) + pfs, err := r.fetchParametersFromSource(ctx, feed.Namespace, &p) if err != nil { return resolved, err } @@ -409,11 +412,11 @@ func (r *reconciler) resolveTrigger(feed *feedsv1alpha1.Feed) (sources.EventTrig return resolved, nil } -func (r *reconciler) fetchParametersFromSource(namespace string, parametersFrom *feedsv1alpha1.ParametersFromSource) (map[string]interface{}, error) { +func (r *reconciler) fetchParametersFromSource(ctx context.Context, namespace string, parametersFrom *feedsv1alpha1.ParametersFromSource) (map[string]interface{}, error) { var params map[string]interface{} if parametersFrom.SecretKeyRef != nil { glog.Infof("fetching secret %+v", parametersFrom.SecretKeyRef) - data, err := r.fetchSecretKeyValue(namespace, parametersFrom.SecretKeyRef) + data, err := r.fetchSecretKeyValue(ctx, namespace, parametersFrom.SecretKeyRef) if err != nil { return nil, err } @@ -428,9 +431,9 @@ func (r *reconciler) fetchParametersFromSource(namespace string, parametersFrom return params, nil } -func (r *reconciler) fetchSecretKeyValue(namespace string, secretKeyRef *feedsv1alpha1.SecretKeyReference) ([]byte, error) { +func (r *reconciler) fetchSecretKeyValue(ctx context.Context, namespace string, secretKeyRef *feedsv1alpha1.SecretKeyReference) ([]byte, error) { secret := &corev1.Secret{} - err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: secretKeyRef.Name}, secret) + err := r.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: secretKeyRef.Name}, secret) if err != nil { return nil, err } @@ -445,8 +448,8 @@ func unmarshalJSON(in []byte) (map[string]interface{}, error) { return parameters, nil } -func (r *reconciler) createJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) (*batchv1.Job, error) { - trigger, err := r.resolveTrigger(feed) +func (r *reconciler) createJob(ctx context.Context, feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) (*batchv1.Job, error) { + trigger, err := r.resolveTrigger(ctx, feed) if err != nil { return nil, err } @@ -456,19 +459,19 @@ func (r *reconciler) createJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha1.Event return nil, err } - if err := r.client.Create(context.TODO(), job); err != nil { + if err := r.client.Create(ctx, job); err != nil { return nil, err } return job, nil } -func (r *reconciler) setFeedContext(feed *feedsv1alpha1.Feed, job *batchv1.Job) error { - ctx, err := r.getJobContext(job) +func (r *reconciler) setFeedContext(ctx context.Context, feed *feedsv1alpha1.Feed, job *batchv1.Job) error { + feedContext, err := r.getJobContext(ctx, job) if err != nil { return err } - marshalledFeedContext, err := json.Marshal(&ctx.Context) + marshalledFeedContext, err := json.Marshal(&feedContext.Context) if err != nil { return err } @@ -479,8 +482,8 @@ func (r *reconciler) setFeedContext(feed *feedsv1alpha1.Feed, job *batchv1.Job) return nil } -func (r *reconciler) getJobContext(job *batchv1.Job) (*sources.FeedContext, error) { - pods, err := r.getJobPods(job) +func (r *reconciler) getJobContext(ctx context.Context, job *batchv1.Job) (*sources.FeedContext, error) { + pods, err := r.getJobPods(ctx, job) if err != nil { return nil, err } @@ -504,7 +507,7 @@ func (r *reconciler) getJobContext(job *batchv1.Job) (*sources.FeedContext, erro return &sources.FeedContext{}, nil } -func (r *reconciler) getJobPods(job *batchv1.Job) ([]corev1.Pod, error) { +func (r *reconciler) getJobPods(ctx context.Context, job *batchv1.Job) ([]corev1.Pod, error) { podList := &corev1.PodList{} listOptions := client. InNamespace(job.Namespace). @@ -519,21 +522,21 @@ func (r *reconciler) getJobPods(job *batchv1.Job) ([]corev1.Pod, error) { }, } - if err := r.client.List(context.TODO(), listOptions, podList); err != nil { + if err := r.client.List(ctx, listOptions, podList); err != nil { return nil, err } return podList.Items, nil } -func (r *reconciler) deleteJobPods(job *batchv1.Job) error { - pods, err := r.getJobPods(job) +func (r *reconciler) deleteJobPods(ctx context.Context, job *batchv1.Job) error { + pods, err := r.getJobPods(ctx, job) if err != nil { return err } for _, pod := range pods { - if err := r.client.Delete(context.TODO(), &pod); err != nil { + if err := r.client.Delete(ctx, &pod); err != nil { return err } glog.Infof("Deleted start job pod: %s/%s", pod.Namespace, pod.Name) @@ -544,9 +547,9 @@ func (r *reconciler) deleteJobPods(job *batchv1.Job) error { // getFeedSource gets the EventSource and EventType that the trigger is targeting and // returns them. If either the source or type is not found or is in the deleting state // returns an error of the proper type. -func (r *reconciler) getFeedSource(feed *feedsv1alpha1.Feed) (*feedsv1alpha1.EventSource, *feedsv1alpha1.EventType, error) { +func (r *reconciler) getFeedSource(ctx context.Context, feed *feedsv1alpha1.Feed) (*feedsv1alpha1.EventSource, *feedsv1alpha1.EventType, error) { es := &feedsv1alpha1.EventSource{} - if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.Service}, es); err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.Service}, es); err != nil { if errors.IsNotFound(err) { msg := fmt.Sprintf("EventSource %s/%s does not exist", feed.Namespace, feed.Spec.Trigger.Service) glog.Info(msg) @@ -563,7 +566,7 @@ func (r *reconciler) getFeedSource(feed *feedsv1alpha1.Feed) (*feedsv1alpha1.Eve } et := &feedsv1alpha1.EventType{} - if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.EventType}, et); err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.EventType}, et); err != nil { if errors.IsNotFound(err) { msg := fmt.Sprintf("EventType %s/%s does not exist", feed.Namespace, feed.Spec.Trigger.EventType) glog.Info(msg)