Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 74 additions & 46 deletions pkg/reconciler/containersource/containersource.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
appsv1listers "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/cache"

status "github.com/knative/eventing/pkg/apis/duck"
"github.com/knative/eventing/pkg/apis/sources/v1alpha1"
listers "github.com/knative/eventing/pkg/client/listers/sources/v1alpha1"
"github.com/knative/eventing/pkg/duck"
Expand Down Expand Up @@ -148,44 +148,20 @@ func (r *Reconciler) reconcile(ctx context.Context, source *v1alpha1.ContainerSo
return err
}

deploy, err := r.getDeployment(ctx, source)
ra, err := r.reconcileReceiveAdapter(ctx, source, args)
if err != nil {
if apierrors.IsNotFound(err) {
deploy, err = r.createDeployment(ctx, source, args)
if err != nil {
r.markNotDeployedRecordEvent(source, corev1.EventTypeWarning, "DeploymentCreateFailed", "Could not create deployment: %v", err)
return err
}
r.markDeployingAndRecordEvent(source, corev1.EventTypeNormal, "DeploymentCreated", "Created deployment %q", deploy.Name)
// Since the Deployment has just been created, there's nothing more
// to do until it gets a status. This ContainerSource will be reconciled
// again when the Deployment is updated.
return nil
}
// Something unexpected happened getting the deployment.
r.markDeployingAndRecordEvent(source, corev1.EventTypeWarning, "DeploymentGetFailed", "Error getting deployment: %v", err)
return err
return fmt.Errorf("reconciling receive adapter: %v", err)
}

// Update Deployment spec if it's changed
expected := resources.MakeDeployment(args)
// Since the Deployment spec has fields defaulted by the webhook, it won't
// be equal to expected. Use DeepDerivative to compare only the fields that
// are set in expected.
if !equality.Semantic.DeepDerivative(expected.Spec, deploy.Spec) {
deploy.Spec = expected.Spec
if _, err = r.KubeClientSet.AppsV1().Deployments(deploy.Namespace).Update(deploy); err != nil {
r.markDeployingAndRecordEvent(source, corev1.EventTypeWarning, "DeploymentUpdateFailed", "Failed to update deployment %q: %v", deploy.Name, err)
return fmt.Errorf("updating deployment: %v", err)
// TODO Delete this after 0.8 is cut.
if status.DeploymentIsAvailable(&ra.Status, false) {
err = r.deleteOldReceiveAdapter(ctx, source, ra.Name)
if err != nil {
return fmt.Errorf("deleting old receive adapter: %v", err)
}
r.markDeployingAndRecordEvent(source, corev1.EventTypeNormal, "DeploymentUpdated", "Updated deployment %q", deploy.Name)
return nil
}

// Update source status
if deploy.Status.ReadyReplicas > 0 && !source.Status.IsDeployed() {
if status.DeploymentIsAvailable(&ra.Status, false) {
source.Status.MarkDeployed()
r.Recorder.Eventf(source, corev1.EventTypeNormal, "DeploymentReady", "Deployment %q has %d ready replicas", deploy.Name, deploy.Status.ReadyReplicas)
r.Recorder.Eventf(source, corev1.EventTypeNormal, "DeploymentReady", "Deployment %q has %d ready replicas", ra.Name, ra.Status.ReadyReplicas)
}

return nil
Expand Down Expand Up @@ -244,23 +220,75 @@ func sinkArg(source *v1alpha1.ContainerSource) (string, bool) {
return "", false
}

func (r *Reconciler) getDeployment(ctx context.Context, source *v1alpha1.ContainerSource) (*appsv1.Deployment, error) {
dl, err := r.KubeClientSet.AppsV1().Deployments(source.Namespace).List(metav1.ListOptions{})
if err != nil {
r.Logger.Errorf("Unable to list deployments: %v", zap.Error(err))
return nil, err
func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, src *v1alpha1.ContainerSource, args resources.ContainerArguments) (*appsv1.Deployment, error) {
expected := resources.MakeDeployment(args)

ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected)
if err != nil {
r.markNotDeployedRecordEvent(src, corev1.EventTypeWarning, "DeploymentCreateFailed", "Could not create deployment: %v", err)
return nil, fmt.Errorf("creating new deployment: %v", err)
}
r.markDeployingAndRecordEvent(src, corev1.EventTypeNormal, "DeploymentCreated", "Created deployment %q", ra.Name)
return ra, nil
} else if err != nil {
r.markDeployingAndRecordEvent(src, corev1.EventTypeWarning, "DeploymentGetFailed", "Error getting deployment: %v", err)
return nil, fmt.Errorf("getting deployment: %v", err)
} else if !metav1.IsControlledBy(ra, src) {
r.markDeployingAndRecordEvent(src, corev1.EventTypeWarning, "DeploymentNotOwned", "Deployment %q is not owned by this ContainerSource", ra.Name)
return nil, fmt.Errorf("deployment %q is not owned by ContainerSource %q", ra.Name, src.Name)
} else if r.podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra)
if err != nil {
return ra, fmt.Errorf("updating deployment: %v", err)
}
return ra, nil
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can get rid of this last else statement and just log

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. For example, if the receive adapter isn't found, then it is created. If we moved the log line outside the else, we would log that we are reusing an existing receive adapter.

logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra))
}
return ra, nil
}

func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool {
// Since the Deployment spec has fields defaulted by the webhook, it won't
// be equal to expected. Use DeepDerivative to compare only the fields that
// are set in newPodSpec.
if !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) {
return true
}
for _, c := range dl.Items {
if metav1.IsControlledBy(&c, source) {
return &c, nil
if len(oldPodSpec.Containers) != len(newPodSpec.Containers) {
return true
}
for i := range newPodSpec.Containers {
if !equality.Semantic.DeepEqual(newPodSpec.Containers[i].Env, oldPodSpec.Containers[i].Env) {
return true
}
}
return nil, apierrors.NewNotFound(schema.GroupResource{}, "")
return false
}

func (r *Reconciler) createDeployment(ctx context.Context, source *v1alpha1.ContainerSource, args resources.ContainerArguments) (*appsv1.Deployment, error) {
deployment := resources.MakeDeployment(args)
return r.KubeClientSet.AppsV1().Deployments(source.Namespace).Create(deployment)
// TODO Delete this after 0.8 is cut.
func (r *Reconciler) deleteOldReceiveAdapter(ctx context.Context, src *v1alpha1.ContainerSource, currentName string) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a TODO that this should be deleted after the 0.8 cut?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Sadly there were no labels attached to the Deployment itself.
dl, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("listing old receive adapter: %v", err)
}
for _, ora := range dl.Items {
// Note that this will match the new receive adapter as well.
if metav1.IsControlledBy(&ora, src) {
// Ignore the current receive adapter.
if ora.Name != currentName {
err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Delete(ora.Name, &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("deleting old receive adapter %q: %v", ora.Name, err)
}
}
}
}
return nil
}

func (r *Reconciler) markDeployingAndRecordEvent(source *v1alpha1.ContainerSource, evType string, reason string, messageFmt string, args ...interface{}) {
Expand Down
67 changes: 43 additions & 24 deletions pkg/reconciler/containersource/containersource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ var (
sinkDNS = "sink.mynamespace.svc." + utils.GetClusterDomainName()
sinkURI = "http://" + sinkDNS

deploymentName = fmt.Sprintf("containersource-%s-%s", sourceName, sourceUID)

// We cannot take the address of constants, so copy it into a var.
conditionTrue = corev1.ConditionTrue

// TODO: k8s service does not work, fix.
//serviceRef = corev1.ObjectReference{
// Name: sinkName,
Expand Down Expand Up @@ -221,7 +226,7 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment ""`), // TODO on noes
Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment "%s"`, deploymentName), // TODO on noes
Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Expand All @@ -244,7 +249,7 @@ func TestAllCases(t *testing.T) {
// Status Update:
WithInitContainerSourceConditions,
WithContainerSourceSink(sinkURI),
WithContainerSourceDeploying(`Created deployment ""`),
WithContainerSourceDeploying(fmt.Sprintf(`Created deployment "%s"`, deploymentName)),
),
}},
WantCreates: []runtime.Object{
Expand All @@ -253,7 +258,7 @@ func TestAllCases(t *testing.T) {
DeprecatedImage: image,
}),
WithContainerSourceUID(sourceUID),
), 0, nil, nil),
), nil, nil, nil),
},
}, {
Name: "valid first pass without template",
Expand All @@ -271,7 +276,7 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment ""`), // TODO on noes
Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment "%s"`, deploymentName), // TODO on noes
Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Expand All @@ -284,7 +289,7 @@ func TestAllCases(t *testing.T) {
// Status Update:
WithInitContainerSourceConditions,
WithContainerSourceSink(sinkURI),
WithContainerSourceDeploying(`Created deployment ""`),
WithContainerSourceDeploying(fmt.Sprintf(`Created deployment "%s"`, deploymentName)),
),
}},
WantCreates: []runtime.Object{
Expand All @@ -293,7 +298,7 @@ func TestAllCases(t *testing.T) {
DeprecatedImage: image,
}),
WithContainerSourceUID(sourceUID),
), 0, nil, nil),
), nil, nil, nil),
},
}, {
Name: "valid, with ready deployment with template",
Expand Down Expand Up @@ -326,11 +331,11 @@ func TestAllCases(t *testing.T) {
DeprecatedImage: image,
}),
WithContainerSourceUID(sourceUID),
), 1, nil, nil),
), &conditionTrue, nil, nil),
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "DeploymentReady", `Deployment "" has 1 ready replicas`),
Eventf(corev1.EventTypeNormal, "DeploymentReady", `Deployment "%s" has 1 ready replicas`, deploymentName),
Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`),
Eventf(corev1.EventTypeNormal, "ContainerSourceReadinessChanged", `ContainerSource "test-container-source" became ready`),
},
Expand Down Expand Up @@ -378,11 +383,11 @@ func TestAllCases(t *testing.T) {
DeprecatedImage: image,
}),
WithContainerSourceUID(sourceUID),
), 1, nil, nil),
), &conditionTrue, nil, nil),
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "DeploymentReady", `Deployment "" has 1 ready replicas`),
Eventf(corev1.EventTypeNormal, "DeploymentReady", `Deployment "%s" has 1 ready replicas`, deploymentName),
Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`),
Eventf(corev1.EventTypeNormal, "ContainerSourceReadinessChanged", `ContainerSource "test-container-source" became ready`),
},
Expand Down Expand Up @@ -427,7 +432,7 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment ""`), // TODO on noes
Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment "%s"`, deploymentName), // TODO on noes
Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Expand All @@ -452,7 +457,7 @@ func TestAllCases(t *testing.T) {
// Status Update:
WithInitContainerSourceConditions,
WithContainerSourceSink(sinkURI),
WithContainerSourceDeploying(`Created deployment ""`),
WithContainerSourceDeploying(fmt.Sprintf(`Created deployment "%s"`, deploymentName)),
),
}},
WantCreates: []runtime.Object{
Expand All @@ -461,7 +466,7 @@ func TestAllCases(t *testing.T) {
DeprecatedImage: image,
}),
WithContainerSourceUID(sourceUID),
), 0, map[string]string{"label": "labeled"}, map[string]string{"annotation": "annotated"}),
), nil, map[string]string{"label": "labeled"}, map[string]string{"annotation": "annotated"}),
},
}, {
Name: "valid first pass, with annotations and labels without template",
Expand All @@ -481,7 +486,7 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment ""`), // TODO on noes
Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment "%s"`, deploymentName), // TODO on noes
Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Expand All @@ -496,7 +501,7 @@ func TestAllCases(t *testing.T) {
// Status Update:
WithInitContainerSourceConditions,
WithContainerSourceSink(sinkURI),
WithContainerSourceDeploying(`Created deployment ""`),
WithContainerSourceDeploying(fmt.Sprintf(`Created deployment "%s"`, deploymentName)),
),
}},
WantCreates: []runtime.Object{
Expand All @@ -505,7 +510,7 @@ func TestAllCases(t *testing.T) {
DeprecatedImage: image,
}),
WithContainerSourceUID(sourceUID),
), 0, map[string]string{"label": "labeled"}, map[string]string{"annotation": "annotated"}),
), nil, map[string]string{"label": "labeled"}, map[string]string{"annotation": "annotated"}),
},
}, {
Name: "error for create deployment",
Expand Down Expand Up @@ -548,7 +553,7 @@ func TestAllCases(t *testing.T) {
DeprecatedImage: image,
}),
WithContainerSourceUID(sourceUID),
), 0, nil, nil),
), nil, nil, nil),
},
},
//{ // TODO: k8s service does not work, fix.
Expand Down Expand Up @@ -605,31 +610,47 @@ func TestAllCases(t *testing.T) {
))
}

func makeDeployment(source *sourcesv1alpha1.ContainerSource, replicas int32, labels map[string]string, annotations map[string]string) *appsv1.Deployment {
func makeDeployment(source *sourcesv1alpha1.ContainerSource, available *corev1.ConditionStatus, labels map[string]string, annotations map[string]string) *appsv1.Deployment {
args := append(source.Spec.DeprecatedArgs, fmt.Sprintf("--sink=%s", sinkURI))
env := append(source.Spec.DeprecatedEnv, corev1.EnvVar{Name: "SINK", Value: sinkURI})

labs := map[string]string{
"eventing.knative.dev/source": source.Name,
"sources.eventing.knative.dev/containerSource": source.Name,
}
for k, v := range labels {
labs[k] = v
}

status := appsv1.DeploymentStatus{}
if available != nil {
status.Conditions = []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: *available,
},
}
if *available == corev1.ConditionTrue {
status.ReadyReplicas = 1
}
}

return &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: appsv1.SchemeGroupVersion.String(),
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", source.Name),
Name: deploymentName,
Namespace: source.Namespace,
OwnerReferences: getOwnerReferences(),
Labels: map[string]string{
"sources.eventing.knative.dev/containerSource": source.Name,
},
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"eventing.knative.dev/source": source.Name,
"sources.eventing.knative.dev/containerSource": source.Name,
},
},
Template: corev1.PodTemplateSpec{
Expand All @@ -649,9 +670,7 @@ func makeDeployment(source *sourcesv1alpha1.ContainerSource, replicas int32, lab
},
},
},
Status: appsv1.DeploymentStatus{
ReadyReplicas: replicas,
},
Status: status,
}
}

Expand Down
Loading