Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 69 additions & 46 deletions pkg/reconciler/cronjobsource/cronjobsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"time"

status "github.com/knative/eventing/pkg/apis/duck"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/apis/sources/v1alpha1"
eventinglisters "github.com/knative/eventing/pkg/client/listers/eventing/v1alpha1"
Expand All @@ -40,17 +41,18 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
appsv1listers "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/controller"
)

const (
// Name of the corev1.Events emitted from the reconciliation process
cronjobReconciled = "CronJobSourceReconciled"
cronJobReadinessChanged = "CronJobSourceReadinessChanged"
cronjobUpdateStatusFailed = "CronJobSourceUpdateStatusFailed"
cronJobReconciled = "CronJobSourceReconciled"
cronJobReadinessChanged = "CronJobSourceReadinessChanged"
cronJobUpdateStatusFailed = "CronJobSourceUpdateStatusFailed"
cronJobSourceDeploymentCreated = "CronJobSurceDeploymentCreated"
cronJobSourceDeploymentUpdated = "CronJobSourceDeploymentUpdated"

// raImageEnvVar is the name of the environment variable that contains the receive adapter's
// image. It must be defined.
Expand Down Expand Up @@ -104,12 +106,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
logging.FromContext(ctx).Warn("Error reconciling CronJobSource", zap.Error(err))
} else {
logging.FromContext(ctx).Debug("CronJobSource reconciled")
r.Recorder.Eventf(cronjob, corev1.EventTypeNormal, cronjobReconciled, `CronJobSource reconciled: "%s/%s"`, cronjob.Namespace, cronjob.Name)
r.Recorder.Eventf(cronjob, corev1.EventTypeNormal, cronJobReconciled, `CronJobSource reconciled: "%s/%s"`, cronjob.Namespace, cronjob.Name)
}

if _, updateStatusErr := r.updateStatus(ctx, cronjob.DeepCopy()); updateStatusErr != nil {
logging.FromContext(ctx).Warn("Failed to update the CronJobSource", zap.Error(err))
r.Recorder.Eventf(cronjob, corev1.EventTypeWarning, cronjobUpdateStatusFailed, "Failed to update CronJobSource's status: %v", err)
r.Recorder.Eventf(cronjob, corev1.EventTypeWarning, cronJobUpdateStatusFailed, "Failed to update CronJobSource's status: %v", err)
return updateStatusErr
}

Expand Down Expand Up @@ -155,18 +157,35 @@ func (r *Reconciler) reconcile(ctx context.Context, cronjob *v1alpha1.CronJobSou

ra, err := r.createReceiveAdapter(ctx, cronjob, sinkURI)
if err != nil {
r.Logger.Error("Unable to create the receive adapter", zap.Error(err))
logging.FromContext(ctx).Error("Unable to create the receive adapter", zap.Error(err))
return fmt.Errorf("creating receive adapter: %v", err)
}
cronjob.Status.PropagateDeploymentAvailability(ra)

// TODO Delete this after 0.8 is cut.
if status.DeploymentIsAvailable(&ra.Status, true) {
err = r.deleteOldReceiveAdapter(ctx, cronjob)
if err != nil {
return fmt.Errorf("deleting old receive adapter: %v", err)
}
}
_, err = r.reconcileEventType(ctx, cronjob)
if err != nil {
cronjob.Status.MarkNoEventType("EventTypeReconcileFailed", "")
return fmt.Errorf("reconciling event types: %v", err)
}
cronjob.Status.MarkEventType()

// TODO Delete this after 0.8 is cut.
oldEventType, err := r.getOldEventType(ctx, cronjob)
if err != nil {
return fmt.Errorf("getting old event type: %v", err)
} else if oldEventType != nil {
if err = r.EventingClientSet.EventingV1alpha1().EventTypes(cronjob.Namespace).Delete(oldEventType.Name, &metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("deleting old event type: %v", err)
}
}

return nil
}

Expand Down Expand Up @@ -201,40 +220,41 @@ func checkResourcesStatus(src *v1alpha1.CronJobSource) error {
}

func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource, sinkURI string) (*appsv1.Deployment, error) {

if err := checkResourcesStatus(src); err != nil {
return nil, err
}

ra, err := r.getReceiveAdapter(ctx, src)
if err != nil && !apierrors.IsNotFound(err) {
logging.FromContext(ctx).Error("Unable to get an existing receive adapter", zap.Error(err))
return nil, fmt.Errorf("getting receive adapter: %v", err)
}
adapterArgs := resources.ReceiveAdapterArgs{
Image: r.env.Image,
Source: src,
Labels: resources.Labels(src.Name),
SinkURI: sinkURI,
}
expected := resources.MakeReceiveAdapter(&adapterArgs)
if ra != nil {
if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil {
return ra, fmt.Errorf("updating receive adapter: %v", err)
}
logging.FromContext(ctx).Info("Receive Adapter updated.", zap.Any("receiveAdapter", ra))

ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected)
msg := "Deployment created"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this msg... just use it in the Eventf call. Or use it in both places

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I added this is I couldn't find a way to get it to look nice otherwise. Initially I had:

r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentCreated, "Deployment created: error %v", err)

Which looked fine when there was an error, but looked bad when there wasn't (Deployment created: error <nil>). I like adding the error keyword, which is why I ended up with the current code. Overall, I want two distinct messages, "Deployment created" and "Deployment created, error: %v". Do you have a suggestion on how to accomplish that?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, my bad... Misread the code, they are indeed different logs...

if err != nil {
msg = fmt.Sprintf("Deployment created, error: %v", err)
}
logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra))
r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentCreated, "%s", msg)
return ra, err
} else if err != nil {
return nil, fmt.Errorf("error getting receive adapter: %v", err)
} else if !metav1.IsControlledBy(ra, src) {
return nil, fmt.Errorf("deployment %q is not owned by CronJobSource %q", ra.Name, src.Name)
} else if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil {
return ra, err
}
r.Recorder.Eventf(src, corev1.EventTypeNormal, cronJobSourceDeploymentUpdated, "Deployment %q updated", ra.Name)
return ra, nil
} else {
logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra))
}

ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected)
if err != nil {
return nil, fmt.Errorf("creating receive adapter: %v", err)
}
logging.FromContext(ctx).Info("Receive Adapter created.", zap.Any("receiveAdapter", ra))
return ra, nil
}

Expand All @@ -253,24 +273,27 @@ func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool {
return false
}

func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource) (*appsv1.Deployment, error) {
func (r *Reconciler) deleteOldReceiveAdapter(ctx context.Context, src *v1alpha1.CronJobSource) error {
dl, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).List(metav1.ListOptions{
LabelSelector: r.getLabelSelector(src).String(),
LabelSelector: labels.SelectorFromSet(resources.OldLabels(src.Name)).String(),
})
if err != nil {
logging.FromContext(ctx).Error("Unable to list CronJobs: %v", zap.Error(err))
return nil, fmt.Errorf("listing CronJobs: %v", err)
return fmt.Errorf("listing old receive adapter: %v", err)
}
for _, dep := range dl.Items {
if metav1.IsControlledBy(&dep, src) {
return &dep, nil
for _, ora := range dl.Items {
if metav1.IsControlledBy(&ora, src) {
err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Delete(ora.Name, &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("deleting old receive adapter %q: %v", ora.Name, err)
}
}
}
return nil, apierrors.NewNotFound(schema.GroupResource{}, "")
return nil
}

func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) {
current, err := r.getEventType(ctx, src)
expected := resources.MakeEventType(src)
current, err := r.eventTypeLister.EventTypes(src.Namespace).Get(expected.Name)
if err != nil && !apierrors.IsNotFound(err) {
logging.FromContext(ctx).Error("Unable to get an existing event type", zap.Error(err))
return nil, fmt.Errorf("getting event types: %v", err)
Expand All @@ -281,37 +304,37 @@ func (r *Reconciler) reconcileEventType(ctx context.Context, src *v1alpha1.CronJ
if src.Spec.Sink.Kind != "Broker" {
if current != nil {
if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil {
logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current))
logging.FromContext(ctx).Error("Error deleting existing event type", zap.Error(err), zap.Any("eventType", current))
return nil, fmt.Errorf("deleting event type: %v", err)
}
}
// No current and no error.
return nil, nil
}

expected := resources.MakeEventType(src)
if current != nil {
if equality.Semantic.DeepEqual(expected.Spec, current.Spec) {
return current, nil
}
// EventTypes are immutable, delete it and create it again.
if err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Delete(current.Name, &metav1.DeleteOptions{}); err != nil {
logging.FromContext(ctx).Error("Error deleting existing event type", zap.Any("eventType", current))
logging.FromContext(ctx).Error("Error deleting existing event type", zap.Error(err), zap.Any("eventType", current))
return nil, fmt.Errorf("deleting event type: %v", err)
}
}

current, err = r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).Create(expected)
if err != nil {
logging.FromContext(ctx).Error("Error creating event type", zap.Any("eventType", current))
logging.FromContext(ctx).Error("Error creating event type", zap.Error(err), zap.Any("eventType", expected))
return nil, fmt.Errorf("creating event type: %v", err)
}
logging.FromContext(ctx).Debug("EventType created", zap.Any("eventType", current))
return current, nil
}

func (r *Reconciler) getEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) {
func (r *Reconciler) getOldEventType(ctx context.Context, src *v1alpha1.CronJobSource) (*eventingv1alpha1.EventType, error) {
etl, err := r.EventingClientSet.EventingV1alpha1().EventTypes(src.Namespace).List(metav1.ListOptions{
LabelSelector: r.getLabelSelector(src).String(),
LabelSelector: r.getOldLabelSelector(src).String(),
})
if err != nil {
logging.FromContext(ctx).Error("Unable to list event types: %v", zap.Error(err))
Expand All @@ -322,11 +345,11 @@ func (r *Reconciler) getEventType(ctx context.Context, src *v1alpha1.CronJobSour
return &et, nil
}
}
return nil, apierrors.NewNotFound(schema.GroupResource{}, "")
return nil, nil
}

func (r *Reconciler) getLabelSelector(src *v1alpha1.CronJobSource) labels.Selector {
return labels.SelectorFromSet(resources.Labels(src.Name))
func (r *Reconciler) getOldLabelSelector(src *v1alpha1.CronJobSource) labels.Selector {
return labels.SelectorFromSet(resources.OldLabels(src.Name))
}

func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJobSource) (*v1alpha1.CronJobSource, error) {
Expand All @@ -349,7 +372,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob
cj, err := r.EventingClientSet.SourcesV1alpha1().CronJobSources(desired.Namespace).UpdateStatus(existing)
if err == nil && becomesReady {
duration := time.Since(cj.ObjectMeta.CreationTimestamp.Time)
r.Logger.Infof("CronJobSource %q became ready after %v", cronjob.Name, duration)
logging.FromContext(ctx).Info("CronJobSource became ready after", zap.Duration("duration", duration))
r.Recorder.Event(cronjob, corev1.EventTypeNormal, cronJobReadinessChanged, fmt.Sprintf("CronJobSource %q became ready", cronjob.Name))
if recorderErr := r.StatsReporter.ReportReady("CronJobSource", cronjob.Namespace, cronjob.Name, duration); recorderErr != nil {
logging.FromContext(ctx).Error("Failed to record ready for CronJobSource", zap.Error(recorderErr))
Expand Down
Loading