Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 53 additions & 50 deletions pkg/controller/feed/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ const (
// its current state.
// If Reconcile returns a non-nil error, the request will be retried.
func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
//TODO use this to store the logger and set a deadline
ctx := context.TODO()

feed := &feedsv1alpha1.Feed{}
err := r.client.Get(context.TODO(), request.NamespacedName, feed)
err := r.client.Get(ctx, request.NamespacedName, feed)

// The feed may have been deleted since it was added to the workqueue. If so
// there's nothing to be done.
Expand All @@ -69,14 +72,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
// state, it means no more can be done for now. In this case the feed will
// not be reconciled again until the resync period or a watched resource
// changes.
if err = r.reconcile(feed); err != nil {
if err = r.reconcile(ctx, feed); err != nil {
glog.Errorf("error reconciling Feed: %v", err)
}

// Since the reconcile is a sequence of steps, earlier steps may complete
// successfully while later steps fail. The Feed is updated on failure to
// preserve any useful status or metadata changes the non-failing steps made.
if updateErr := r.updateFeed(feed); updateErr != nil {
if updateErr := r.updateFeed(ctx, feed); updateErr != nil {
glog.Errorf("failed to update Feed: %v", updateErr)
// An error here means the feed should be reconciled again, regardless of
// whether the reconcile was successful or not.
Expand All @@ -88,12 +91,12 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
// reconcile tries to converge the current state of the given Feed to the
// desired state. This function should not update the Feed; the calling method
// should do that.
func (r *reconciler) reconcile(feed *feedsv1alpha1.Feed) error {
func (r *reconciler) reconcile(ctx context.Context, feed *feedsv1alpha1.Feed) error {
feed.Status.InitializeConditions()
// Fetch the EventSource and EventType that is being asked for
// and if they don't exist update the status to reflect this back
// to the user.
eventSource, eventType, err := r.getFeedSource(feed)
eventSource, eventType, err := r.getFeedSource(ctx, feed)
if err != nil {
switch t := err.(type) {
case *EventSourceError:
Expand Down Expand Up @@ -133,15 +136,15 @@ func (r *reconciler) reconcile(feed *feedsv1alpha1.Feed) error {
// TODO: Remove this and use finalizers in the EventTypes / EventSources
// to do this properly.
// TODO: Add issue link here. can't look up right now, no wifi
r.setEventTypeOwnerReference(feed)
r.setEventTypeOwnerReference(ctx, feed)

if feed.GetDeletionTimestamp() == nil {
err = r.reconcileStartJob(feed, eventSource, eventType)
err = r.reconcileStartJob(ctx, feed, eventSource, eventType)
if err != nil {
glog.Errorf("error reconciling start Job: %v", err)
}
} else {
err = r.reconcileStopJob(feed, eventSource, eventType)
err = r.reconcileStopJob(ctx, feed, eventSource, eventType)
if err != nil {
glog.Errorf("error reconciling stop Job: %v", err)
}
Expand All @@ -151,17 +154,17 @@ func (r *reconciler) reconcile(feed *feedsv1alpha1.Feed) error {

// reconcileStartJob creates a start Job if one doesn't exist, checks the status
// of the start Job, and updates the Feed status accordingly.
func (r *reconciler) reconcileStartJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) error {
func (r *reconciler) reconcileStartJob(ctx context.Context, feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) error {
bc := feed.Status.GetCondition(feedsv1alpha1.FeedConditionReady)
switch bc.Status {
case corev1.ConditionUnknown:

job := &batchv1.Job{}
jobName := resources.JobName(feed)

if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job); err != nil {
if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job); err != nil {
if errors.IsNotFound(err) {
job, err = r.createJob(feed, es, et)
job, err = r.createJob(ctx, feed, es, et)
if err != nil {
return err
}
Expand All @@ -178,7 +181,7 @@ func (r *reconciler) reconcileStartJob(feed *feedsv1alpha1.Feed, es *feedsv1alph

if resources.IsJobComplete(job) {
r.recorder.Eventf(feed, corev1.EventTypeNormal, "StartJobCompleted", "Start job %q completed", job.Name)
if err := r.setFeedContext(feed, job); err != nil {
if err := r.setFeedContext(ctx, feed, job); err != nil {
return err
}
feed.Status.SetCondition(&feedsv1alpha1.FeedCondition{
Expand Down Expand Up @@ -206,14 +209,14 @@ func (r *reconciler) reconcileStartJob(feed *feedsv1alpha1.Feed, es *feedsv1alph
// reconcileStopJob deletes the start Job if it exists, creates a stop Job if
// one doesn't exist, checks the status of the stop Job, and updates the Feed
// status accordingly.
func (r *reconciler) reconcileStopJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) error {
func (r *reconciler) reconcileStopJob(ctx context.Context, feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) error {
if feed.HasFinalizer(finalizerName) {

// check for an existing start Job
job := &batchv1.Job{}
jobName := resources.StartJobName(feed)

err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job)
err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job)
if err != nil && !errors.IsNotFound(err) {
return err
}
Expand All @@ -224,19 +227,19 @@ func (r *reconciler) reconcileStopJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha

// Need to delete pods first to workaround the client's lack of support
// for cascading deletes. TODO remove this when client support allows.
if err = r.deleteJobPods(job); err != nil {
if err = r.deleteJobPods(ctx, job); err != nil {
return err
}
r.client.Delete(context.TODO(), job)
r.client.Delete(ctx, job)
glog.Infof("Deleted start job: %s/%s", job.Namespace, job.Name)
return nil
}

jobName = resources.JobName(feed)

if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job); err != nil {
if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: jobName}, job); err != nil {
if errors.IsNotFound(err) {
job, err = r.createJob(feed, es, et)
job, err = r.createJob(ctx, feed, es, et)
if err != nil {
return err
}
Expand Down Expand Up @@ -278,9 +281,9 @@ func (r *reconciler) reconcileStopJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha

// updateFeed updates the given Feed's owner references, finalizers, and status
// to the given values. It skips the update if none of those values would change.
func (r *reconciler) updateFeed(u *feedsv1alpha1.Feed) error {
func (r *reconciler) updateFeed(ctx context.Context, u *feedsv1alpha1.Feed) error {
feed := &feedsv1alpha1.Feed{}
err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, feed)
err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, feed)
if err != nil {
return err
}
Expand Down Expand Up @@ -308,7 +311,7 @@ func (r *reconciler) updateFeed(u *feedsv1alpha1.Feed) error {
// update the Status block of the Feed resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
return r.client.Update(context.TODO(), feed)
return r.client.Update(ctx, feed)
}

// getEventTypeName returns the name of the Feed's referenced EventType or
Expand All @@ -324,21 +327,21 @@ func (r *reconciler) getEventTypeName(feed *feedsv1alpha1.Feed) string {

// setEventTypeOwnerReference makes the given Feed's referenced EventType or
// ClusterEventType a non-controlling owner of the Feed.
func (r *reconciler) setEventTypeOwnerReference(feed *feedsv1alpha1.Feed) error {
func (r *reconciler) setEventTypeOwnerReference(ctx context.Context, feed *feedsv1alpha1.Feed) error {
// TODO(nicholss): need to set the owner on a cluser level event type as well.
if len(feed.Spec.Trigger.EventType) > 0 {
return r.setNamespacedEventTypeOwnerReference(feed)
return r.setNamespacedEventTypeOwnerReference(ctx, feed)
} else if len(feed.Spec.Trigger.ClusterEventType) > 0 {
return r.setClusterEventTypeOwnerReference(feed)
return r.setClusterEventTypeOwnerReference(ctx, feed)
}
return nil
}

// setEventTypeOwnerReference makes the given Feed's referenced EventType a
// non-controlling owner of the Feed.
func (r *reconciler) setNamespacedEventTypeOwnerReference(feed *feedsv1alpha1.Feed) error {
func (r *reconciler) setNamespacedEventTypeOwnerReference(ctx context.Context, feed *feedsv1alpha1.Feed) error {
et := &feedsv1alpha1.EventType{}
if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.EventType}, et); err != nil {
if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.EventType}, et); err != nil {
if errors.IsNotFound(err) {
glog.Errorf("Feed EventType not found, will not set finalizer")
return nil
Expand All @@ -358,9 +361,9 @@ func (r *reconciler) setNamespacedEventTypeOwnerReference(feed *feedsv1alpha1.Fe

// setEventTypeOwnerReference makes the given Feed's referenced ClusterEventType
// a non-controlling owner of the Feed.
func (r *reconciler) setClusterEventTypeOwnerReference(feed *feedsv1alpha1.Feed) error {
func (r *reconciler) setClusterEventTypeOwnerReference(ctx context.Context, feed *feedsv1alpha1.Feed) error {
et := &feedsv1alpha1.ClusterEventType{}
if err := r.client.Get(context.TODO(), client.ObjectKey{Name: feed.Spec.Trigger.ClusterEventType}, et); err != nil {
if err := r.client.Get(ctx, client.ObjectKey{Name: feed.Spec.Trigger.ClusterEventType}, et); err != nil {
if errors.IsNotFound(err) {
glog.Errorf("Feed ClusterEventType not found, will not set finalizer")
return nil
Expand All @@ -380,7 +383,7 @@ func (r *reconciler) setClusterEventTypeOwnerReference(feed *feedsv1alpha1.Feed)

// resolveTrigger extracts the trigger from the Feed, reifies the parameters,
// and turns it all into an EventTrigger.
func (r *reconciler) resolveTrigger(feed *feedsv1alpha1.Feed) (sources.EventTrigger, error) {
func (r *reconciler) resolveTrigger(ctx context.Context, feed *feedsv1alpha1.Feed) (sources.EventTrigger, error) {
trigger := feed.Spec.Trigger
resolved := sources.EventTrigger{
Resource: trigger.Resource,
Expand All @@ -400,7 +403,7 @@ func (r *reconciler) resolveTrigger(feed *feedsv1alpha1.Feed) (sources.EventTrig
if trigger.ParametersFrom != nil {
glog.Infof("fetching from source %+v", trigger.ParametersFrom)
for _, p := range trigger.ParametersFrom {
pfs, err := r.fetchParametersFromSource(feed.Namespace, &p)
pfs, err := r.fetchParametersFromSource(ctx, feed.Namespace, &p)
if err != nil {
return resolved, err
}
Expand All @@ -414,11 +417,11 @@ func (r *reconciler) resolveTrigger(feed *feedsv1alpha1.Feed) (sources.EventTrig

// fetchParametersFromSource gets the secret value referenced by the given
// ParametersFrom and returns it as a string-keyed map.
func (r *reconciler) fetchParametersFromSource(namespace string, parametersFrom *feedsv1alpha1.ParametersFromSource) (map[string]interface{}, error) {
func (r *reconciler) fetchParametersFromSource(ctx context.Context, namespace string, parametersFrom *feedsv1alpha1.ParametersFromSource) (map[string]interface{}, error) {
var params map[string]interface{}
if parametersFrom.SecretKeyRef != nil {
glog.Infof("fetching secret %+v", parametersFrom.SecretKeyRef)
data, err := r.fetchSecretKeyValue(namespace, parametersFrom.SecretKeyRef)
data, err := r.fetchSecretKeyValue(ctx, namespace, parametersFrom.SecretKeyRef)
if err != nil {
return nil, err
}
Expand All @@ -432,9 +435,9 @@ func (r *reconciler) fetchParametersFromSource(namespace string, parametersFrom

// fetchSecretKeyValue gets the Secret referenced and returns the data in the
// referenced key.
func (r *reconciler) fetchSecretKeyValue(namespace string, secretKeyRef *feedsv1alpha1.SecretKeyReference) ([]byte, error) {
func (r *reconciler) fetchSecretKeyValue(ctx context.Context, namespace string, secretKeyRef *feedsv1alpha1.SecretKeyReference) ([]byte, error) {
secret := &corev1.Secret{}
err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: secretKeyRef.Name}, secret)
err := r.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: secretKeyRef.Name}, secret)
if err != nil {
return nil, err
}
Expand All @@ -443,8 +446,8 @@ func (r *reconciler) fetchSecretKeyValue(namespace string, secretKeyRef *feedsv1

// createJob creates a Job for the given Feed based on its current state,
// returning the created Job.
func (r *reconciler) createJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) (*batchv1.Job, error) {
trigger, err := r.resolveTrigger(feed)
func (r *reconciler) createJob(ctx context.Context, feed *feedsv1alpha1.Feed, es *feedsv1alpha1.EventSource, et *feedsv1alpha1.EventType) (*batchv1.Job, error) {
trigger, err := r.resolveTrigger(ctx, feed)
if err != nil {
return nil, err
}
Expand All @@ -454,21 +457,21 @@ func (r *reconciler) createJob(feed *feedsv1alpha1.Feed, es *feedsv1alpha1.Event
return nil, err
}

if err := r.client.Create(context.TODO(), job); err != nil {
if err := r.client.Create(ctx, job); err != nil {
return nil, err
}
return job, nil
}

// setFeedContext sets the Feed's context from the context emitted by the given
// Job.
func (r *reconciler) setFeedContext(feed *feedsv1alpha1.Feed, job *batchv1.Job) error {
ctx, err := r.getJobContext(job)
func (r *reconciler) setFeedContext(ctx context.Context, feed *feedsv1alpha1.Feed, job *batchv1.Job) error {
feedContext, err := r.getJobContext(ctx, job)
if err != nil {
return err
}

marshalledFeedContext, err := json.Marshal(&ctx.Context)
marshalledFeedContext, err := json.Marshal(&feedContext.Context)
if err != nil {
return err
}
Expand All @@ -482,8 +485,8 @@ func (r *reconciler) setFeedContext(feed *feedsv1alpha1.Feed, job *batchv1.Job)
// getJobContext returns the FeedContext emitted by the first successful pod
// owned by this job. The feed context is extracted from the termination
// message of the first container in the pod.
func (r *reconciler) getJobContext(job *batchv1.Job) (*sources.FeedContext, error) {
pods, err := r.getJobPods(job)
func (r *reconciler) getJobContext(ctx context.Context, job *batchv1.Job) (*sources.FeedContext, error) {
pods, err := r.getJobPods(ctx, job)
if err != nil {
return nil, err
}
Expand All @@ -508,7 +511,7 @@ func (r *reconciler) getJobContext(job *batchv1.Job) (*sources.FeedContext, erro
}

// getJobPods returns the array of Pods owned by the given Job.
func (r *reconciler) getJobPods(job *batchv1.Job) ([]corev1.Pod, error) {
func (r *reconciler) getJobPods(ctx context.Context, job *batchv1.Job) ([]corev1.Pod, error) {
podList := &corev1.PodList{}
listOptions := client.
InNamespace(job.Namespace).
Expand All @@ -523,21 +526,21 @@ func (r *reconciler) getJobPods(job *batchv1.Job) ([]corev1.Pod, error) {
},
}

if err := r.client.List(context.TODO(), listOptions, podList); err != nil {
if err := r.client.List(ctx, listOptions, podList); err != nil {
return nil, err
}

return podList.Items, nil
}

func (r *reconciler) deleteJobPods(job *batchv1.Job) error {
pods, err := r.getJobPods(job)
func (r *reconciler) deleteJobPods(ctx context.Context, job *batchv1.Job) error {
pods, err := r.getJobPods(ctx, job)
if err != nil {
return err
}

for _, pod := range pods {
if err := r.client.Delete(context.TODO(), &pod); err != nil {
if err := r.client.Delete(ctx, &pod); err != nil {
return err
}
glog.Infof("Deleted start job pod: %s/%s", pod.Namespace, pod.Name)
Expand All @@ -548,9 +551,9 @@ func (r *reconciler) deleteJobPods(job *batchv1.Job) error {
// getFeedSource gets the EventSource and EventType that the trigger is targeting and
// returns them. If either the source or type is not found or is in the deleting state
// returns an error of the proper type.
func (r *reconciler) getFeedSource(feed *feedsv1alpha1.Feed) (*feedsv1alpha1.EventSource, *feedsv1alpha1.EventType, error) {
func (r *reconciler) getFeedSource(ctx context.Context, feed *feedsv1alpha1.Feed) (*feedsv1alpha1.EventSource, *feedsv1alpha1.EventType, error) {
es := &feedsv1alpha1.EventSource{}
if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.Service}, es); err != nil {
if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.Service}, es); err != nil {
if errors.IsNotFound(err) {
msg := fmt.Sprintf("EventSource %s/%s does not exist", feed.Namespace, feed.Spec.Trigger.Service)
glog.Info(msg)
Expand All @@ -567,7 +570,7 @@ func (r *reconciler) getFeedSource(feed *feedsv1alpha1.Feed) (*feedsv1alpha1.Eve
}

et := &feedsv1alpha1.EventType{}
if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.EventType}, et); err != nil {
if err := r.client.Get(ctx, client.ObjectKey{Namespace: feed.Namespace, Name: feed.Spec.Trigger.EventType}, et); err != nil {
if errors.IsNotFound(err) {
msg := fmt.Sprintf("EventType %s/%s does not exist", feed.Namespace, feed.Spec.Trigger.EventType)
glog.Info(msg)
Expand Down