From 122e74ca23c5ac7a37165364d871a90a01bc4256 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Tue, 4 Dec 2018 09:22:54 -0800 Subject: [PATCH 1/2] CreateK8sService diffs its expected spec against the actual spec and updates the K8s Service appropriately. --- config/provisioners/gcppubsub/gcppubsub.yaml | 6 + pkg/provisioners/channel_util.go | 20 +- pkg/provisioners/channel_util_test.go | 215 +++++++++++++++++++ 3 files changed, 236 insertions(+), 5 deletions(-) diff --git a/config/provisioners/gcppubsub/gcppubsub.yaml b/config/provisioners/gcppubsub/gcppubsub.yaml index 312d18a4bf6..2f54c806930 100644 --- a/config/provisioners/gcppubsub/gcppubsub.yaml +++ b/config/provisioners/gcppubsub/gcppubsub.yaml @@ -52,7 +52,13 @@ rules: - get - list - watch + - apiGroups: + - "" # Core API group. + resources: + - services + verbs: - create + - update - apiGroups: - networking.istio.io resources: diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index d1fe8f2ea76..3b11f827260 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -65,11 +65,22 @@ func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *event if k8serrors.IsNotFound(err) { svc = newK8sService(c) err = client.Create(ctx, svc) + if err != nil { + return nil, err + } + return svc, nil + } else if err != nil { + return nil, err } - // If an error occurred in either Get or Create, we need to reconcile again. - if err != nil { - return nil, err + // Update the Service if it has changed. + expected := newK8sService(c) + if !equality.Semantic.DeepDerivative(expected.Spec, svc.Spec) { + svc.Spec = expected.Spec + err = client.Update(ctx, svc) + if err != nil { + return nil, err + } } return svc, nil @@ -96,8 +107,7 @@ func CreateVirtualService(ctx context.Context, client runtimeClient.Client, chan return nil, err } return virtualService, nil - } - if err != nil { + } else if err != nil { return nil, err } diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go index 754b579e4d0..47a35d7e7ea 100644 --- a/pkg/provisioners/channel_util_test.go +++ b/pkg/provisioners/channel_util_test.go @@ -2,9 +2,15 @@ package provisioners import ( "context" + "errors" "fmt" "testing" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + + controllertesting "github.com/knative/eventing/pkg/controller/testing" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/knative/pkg/apis" @@ -25,6 +31,9 @@ const ( var ( truePointer = true + + notFound = k8serrors.NewNotFound(corev1.Resource("any"), "any") + testInducedError = errors.New("test-induced-error") ) func init() { @@ -118,6 +127,196 @@ func TestChannelUtils(t *testing.T) { } } +func TestCreateK8sService(t *testing.T) { + testCases := map[string]struct { + get controllertesting.MockGet + create controllertesting.MockCreate + update controllertesting.MockUpdate + expected *corev1.Service + err error + }{ + "error getting svc": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, testInducedError + }, + err: testInducedError, + }, + "not found - create error": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, notFound + }, + create: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, testInducedError + }, + err: testInducedError, + }, + "not found - create succeeds": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, notFound + }, + create: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + svc := obj.(*corev1.Service) + svc.Spec = makeTamperedK8sService().Spec + return controllertesting.Handled, nil + }, + expected: makeTamperedK8sService(), + }, + "different spec - update fails": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + svc := obj.(*corev1.Service) + svc.Spec = corev1.ServiceSpec{ + ClusterIP: "set in get", + } + return controllertesting.Handled, nil + }, + update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, testInducedError + }, + err: testInducedError, + }, + "different spec - update succeeds": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + svc := obj.(*corev1.Service) + svc.Spec = corev1.ServiceSpec{ + ClusterIP: "set in get", + } + return controllertesting.Handled, nil + }, + update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + svc := obj.(*corev1.Service) + makeTamperedK8sService().DeepCopyInto(svc) + return controllertesting.Handled, nil + }, + expected: makeTamperedK8sService(), + }, + "found doesn't need altering": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + svc := obj.(*corev1.Service) + makeK8sService().DeepCopyInto(svc) + return controllertesting.Handled, nil + }, + create: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New("create should not have been called") + }, + update: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New("update should not have been called") + }, + expected: makeK8sService(), + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + client := controllertesting.NewMockClient(fake.NewFakeClient(), controllertesting.Mocks{ + MockGets: []controllertesting.MockGet{tc.get}, + MockCreates: []controllertesting.MockCreate{tc.create}, + MockUpdates: []controllertesting.MockUpdate{tc.update}, + }) + svc, err := CreateK8sService(context.TODO(), client, getNewChannel()) + if tc.err != err { + t.Fatalf("Unexpected error. Expected '%s', actual '%v'", tc.err, err) + } + if diff := cmp.Diff(tc.expected, svc); diff != "" { + t.Fatalf("Unexpected service (-want +got): %s", diff) + } + }) + } +} + +func TestCreateVirtualService(t *testing.T) { + testCases := map[string]struct { + get controllertesting.MockGet + create controllertesting.MockCreate + update controllertesting.MockUpdate + expected *istiov1alpha3.VirtualService + err error + }{ + "error getting svc": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, testInducedError + }, + err: testInducedError, + }, + "not found - create error": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, notFound + }, + create: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, testInducedError + }, + err: testInducedError, + }, + "not found - create succeeds": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, notFound + }, + create: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + vs := obj.(*istiov1alpha3.VirtualService) + vs.Spec = makeTamperedVirtualService().Spec + return controllertesting.Handled, nil + }, + expected: makeTamperedVirtualService(), + }, + "different spec - update fails": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + vs := obj.(*istiov1alpha3.VirtualService) + vs.Spec = istiov1alpha3.VirtualServiceSpec{ + Gateways: []string{"set in get"}, + } + return controllertesting.Handled, nil + }, + update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, testInducedError + }, + err: testInducedError, + }, + "different spec - update succeeds": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + vs := obj.(*istiov1alpha3.VirtualService) + vs.Spec = istiov1alpha3.VirtualServiceSpec{ + Gateways: []string{"set in get"}, + } + return controllertesting.Handled, nil + }, + update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + vs := obj.(*istiov1alpha3.VirtualService) + makeTamperedVirtualService().DeepCopyInto(vs) + return controllertesting.Handled, nil + }, + expected: makeTamperedVirtualService(), + }, + "found doesn't need altering": { + get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + vs := obj.(*istiov1alpha3.VirtualService) + makeVirtualService().DeepCopyInto(vs) + return controllertesting.Handled, nil + }, + create: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New("create should not have been called") + }, + update: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New("update should not have been called") + }, + expected: makeVirtualService(), + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + client := controllertesting.NewMockClient(fake.NewFakeClient(), controllertesting.Mocks{ + MockGets: []controllertesting.MockGet{tc.get}, + MockCreates: []controllertesting.MockCreate{tc.create}, + MockUpdates: []controllertesting.MockUpdate{tc.update}, + }) + svc, err := CreateVirtualService(context.TODO(), client, getNewChannel()) + if tc.err != err { + t.Fatalf("Unexpected error. Expected '%s', actual '%v'", tc.err, err) + } + if diff := cmp.Diff(tc.expected, svc); diff != "" { + t.Fatalf("Unexpected virtual service (-want +got): %s", diff) + } + }) + } +} + func TestAddFinalizer(t *testing.T) { testCases := map[string]struct { alreadyPresent bool @@ -243,6 +442,14 @@ func makeK8sService() *corev1.Service { } } +func makeTamperedK8sService() *corev1.Service { + svc := makeK8sService() + svc.Spec = corev1.ServiceSpec{ + ClusterIP: "tampered by the unit tests", + } + return svc +} + func makeVirtualService() *istiov1alpha3.VirtualService { return &istiov1alpha3.VirtualService{ ObjectMeta: metav1.ObjectMeta{ @@ -283,3 +490,11 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, } } + +func makeTamperedVirtualService() *istiov1alpha3.VirtualService { + vs := makeVirtualService() + vs.Spec = istiov1alpha3.VirtualServiceSpec{ + Gateways: []string{"tamped by the unit tests"}, + } + return vs +} From c6f30001524c20f7548e6231686d757bc941358f Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Tue, 4 Dec 2018 09:37:01 -0800 Subject: [PATCH 2/2] Unify CreateK8sService and CreateDispatcherService --- pkg/provisioners/channel_util.go | 26 +++++++++++------------- pkg/provisioners/channel_util_test.go | 3 +++ pkg/provisioners/provisioner_util.go | 29 +-------------------------- 3 files changed, 16 insertions(+), 42 deletions(-) diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index 3b11f827260..c73cefbed75 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -49,21 +49,19 @@ func RemoveFinalizer(c *eventingv1alpha1.Channel, finalizerName string) { c.Finalizers = finalizers.List() } -func getK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { +func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { svcKey := types.NamespacedName{ Namespace: c.Namespace, Name: ChannelServiceName(c.Name), } - svc := &corev1.Service{} - err := client.Get(ctx, svcKey, svc) - return svc, err + return createK8sService(ctx, client, svcKey, newK8sService(c)) } -func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { - svc, err := getK8sService(ctx, client, c) +func createK8sService(ctx context.Context, client runtimeClient.Client, svcKey types.NamespacedName, svc *corev1.Service) (*corev1.Service, error) { + current := &corev1.Service{} + err := client.Get(ctx, svcKey, current) if k8serrors.IsNotFound(err) { - svc = newK8sService(c) err = client.Create(ctx, svc) if err != nil { return nil, err @@ -73,17 +71,17 @@ func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *event return nil, err } - // Update the Service if it has changed. - expected := newK8sService(c) - if !equality.Semantic.DeepDerivative(expected.Spec, svc.Spec) { - svc.Spec = expected.Spec - err = client.Update(ctx, svc) + // spec.clusterIP is immutable and is set on existing services. If we don't set this + // to the same value, we will encounter an error while updating. + svc.Spec.ClusterIP = current.Spec.ClusterIP + if !equality.Semantic.DeepDerivative(svc.Spec, current.Spec) { + current.Spec = svc.Spec + err = client.Update(ctx, current) if err != nil { return nil, err } } - - return svc, nil + return current, nil } func getVirtualService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go index 47a35d7e7ea..294bfbf7acd 100644 --- a/pkg/provisioners/channel_util_test.go +++ b/pkg/provisioners/channel_util_test.go @@ -184,6 +184,9 @@ func TestCreateK8sService(t *testing.T) { }, update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { svc := obj.(*corev1.Service) + if svc.Spec.ClusterIP != "set in get" { + return controllertesting.Handled, errors.New("clusterIP should have been overwritten with the version returned by get") + } makeTamperedK8sService().DeepCopyInto(svc) return controllertesting.Handled, nil }, diff --git a/pkg/provisioners/provisioner_util.go b/pkg/provisioners/provisioner_util.go index 461fef3dfd9..b40d4e27304 100644 --- a/pkg/provisioners/provisioner_util.go +++ b/pkg/provisioners/provisioner_util.go @@ -6,7 +6,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -26,33 +25,7 @@ func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, c Namespace: system.Namespace, Name: svcName, } - svc := &corev1.Service{} - err := client.Get(ctx, svcKey, svc) - - if errors.IsNotFound(err) { - svc = newDispatcherService(ccp) - err = client.Create(ctx, svc) - if err != nil { - return nil, err - } - return svc, nil - } - if err != nil { - return nil, err - } - - expected := newDispatcherService(ccp) - // spec.clusterIP is immutable and is set on existing services. If we don't set this - // to the same value, we will encounter an error while updating. - expected.Spec.ClusterIP = svc.Spec.ClusterIP - if !equality.Semantic.DeepDerivative(expected.Spec, svc.Spec) { - svc.Spec = expected.Spec - err := client.Update(ctx, svc) - if err != nil { - return nil, err - } - } - return svc, nil + return createK8sService(ctx, client, svcKey, newDispatcherService(ccp)) } func UpdateClusterChannelProvisionerStatus(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.ClusterChannelProvisioner) error {