Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/provisioners/gcppubsub/gcppubsub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ rules:
- get
- list
- watch
- apiGroups:
- "" # Core API group.
resources:
- services
verbs:
- create
- update
- apiGroups:
- networking.istio.io
resources:
Expand Down
36 changes: 22 additions & 14 deletions pkg/provisioners/channel_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,39 @@ func RemoveFinalizer(c *eventingv1alpha1.Channel, finalizerName string) {
c.Finalizers = finalizers.List()
}

func getK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) {
func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) {
svcKey := types.NamespacedName{
Namespace: c.Namespace,
Name: ChannelServiceName(c.Name),
}
svc := &corev1.Service{}
err := client.Get(ctx, svcKey, svc)
return svc, err
return createK8sService(ctx, client, svcKey, newK8sService(c))
}

func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) {
svc, err := getK8sService(ctx, client, c)
func createK8sService(ctx context.Context, client runtimeClient.Client, svcKey types.NamespacedName, svc *corev1.Service) (*corev1.Service, error) {
current := &corev1.Service{}
err := client.Get(ctx, svcKey, current)

if k8serrors.IsNotFound(err) {
svc = newK8sService(c)
err = client.Create(ctx, svc)
}

// If an error occurred in either Get or Create, we need to reconcile again.
if err != nil {
if err != nil {
return nil, err
}
return svc, nil
} else if err != nil {
return nil, err
}

return svc, nil
// spec.clusterIP is immutable and is set on existing services. If we don't set this
// to the same value, we will encounter an error while updating.
svc.Spec.ClusterIP = current.Spec.ClusterIP
if !equality.Semantic.DeepDerivative(svc.Spec, current.Spec) {
current.Spec = svc.Spec
err = client.Update(ctx, current)
if err != nil {
return nil, err
}
}
return current, nil
}

func getVirtualService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) {
Expand All @@ -96,8 +105,7 @@ func CreateVirtualService(ctx context.Context, client runtimeClient.Client, chan
return nil, err
}
return virtualService, nil
}
if err != nil {
} else if err != nil {
return nil, err
}

Expand Down
218 changes: 218 additions & 0 deletions pkg/provisioners/channel_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ package provisioners

import (
"context"
"errors"
"fmt"
"testing"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"

controllertesting "github.com/knative/eventing/pkg/controller/testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/knative/pkg/apis"
Expand All @@ -25,6 +31,9 @@ const (

var (
truePointer = true

notFound = k8serrors.NewNotFound(corev1.Resource("any"), "any")
testInducedError = errors.New("test-induced-error")
)

func init() {
Expand Down Expand Up @@ -118,6 +127,199 @@ func TestChannelUtils(t *testing.T) {
}
}

func TestCreateK8sService(t *testing.T) {
testCases := map[string]struct {
get controllertesting.MockGet
create controllertesting.MockCreate
update controllertesting.MockUpdate
expected *corev1.Service
err error
}{
"error getting svc": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, testInducedError
},
err: testInducedError,
},
"not found - create error": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, notFound
},
create: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, testInducedError
},
err: testInducedError,
},
"not found - create succeeds": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, notFound
},
create: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
svc := obj.(*corev1.Service)
svc.Spec = makeTamperedK8sService().Spec
return controllertesting.Handled, nil
},
expected: makeTamperedK8sService(),
},
"different spec - update fails": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) {
svc := obj.(*corev1.Service)
svc.Spec = corev1.ServiceSpec{
ClusterIP: "set in get",
}
return controllertesting.Handled, nil
},
update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, testInducedError
},
err: testInducedError,
},
"different spec - update succeeds": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) {
svc := obj.(*corev1.Service)
svc.Spec = corev1.ServiceSpec{
ClusterIP: "set in get",
}
return controllertesting.Handled, nil
},
update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
svc := obj.(*corev1.Service)
if svc.Spec.ClusterIP != "set in get" {
return controllertesting.Handled, errors.New("clusterIP should have been overwritten with the version returned by get")
}
makeTamperedK8sService().DeepCopyInto(svc)
return controllertesting.Handled, nil
},
expected: makeTamperedK8sService(),
},
"found doesn't need altering": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) {
svc := obj.(*corev1.Service)
makeK8sService().DeepCopyInto(svc)
return controllertesting.Handled, nil
},
create: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, errors.New("create should not have been called")
},
update: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, errors.New("update should not have been called")
},
expected: makeK8sService(),
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
client := controllertesting.NewMockClient(fake.NewFakeClient(), controllertesting.Mocks{
MockGets: []controllertesting.MockGet{tc.get},
MockCreates: []controllertesting.MockCreate{tc.create},
MockUpdates: []controllertesting.MockUpdate{tc.update},
})
svc, err := CreateK8sService(context.TODO(), client, getNewChannel())
if tc.err != err {
t.Fatalf("Unexpected error. Expected '%s', actual '%v'", tc.err, err)
}
if diff := cmp.Diff(tc.expected, svc); diff != "" {
t.Fatalf("Unexpected service (-want +got): %s", diff)
}
})
}
}

func TestCreateVirtualService(t *testing.T) {
testCases := map[string]struct {
get controllertesting.MockGet
create controllertesting.MockCreate
update controllertesting.MockUpdate
expected *istiov1alpha3.VirtualService
err error
}{
"error getting svc": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, testInducedError
},
err: testInducedError,
},
"not found - create error": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, notFound
},
create: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, testInducedError
},
err: testInducedError,
},
"not found - create succeeds": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, notFound
},
create: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
vs := obj.(*istiov1alpha3.VirtualService)
vs.Spec = makeTamperedVirtualService().Spec
return controllertesting.Handled, nil
},
expected: makeTamperedVirtualService(),
},
"different spec - update fails": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) {
vs := obj.(*istiov1alpha3.VirtualService)
vs.Spec = istiov1alpha3.VirtualServiceSpec{
Gateways: []string{"set in get"},
}
return controllertesting.Handled, nil
},
update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, testInducedError
},
err: testInducedError,
},
"different spec - update succeeds": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) {
vs := obj.(*istiov1alpha3.VirtualService)
vs.Spec = istiov1alpha3.VirtualServiceSpec{
Gateways: []string{"set in get"},
}
return controllertesting.Handled, nil
},
update: func(_ runtimeClient.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
vs := obj.(*istiov1alpha3.VirtualService)
makeTamperedVirtualService().DeepCopyInto(vs)
return controllertesting.Handled, nil
},
expected: makeTamperedVirtualService(),
},
"found doesn't need altering": {
get: func(_ runtimeClient.Client, _ context.Context, _ runtimeClient.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) {
vs := obj.(*istiov1alpha3.VirtualService)
makeVirtualService().DeepCopyInto(vs)
return controllertesting.Handled, nil
},
create: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, errors.New("create should not have been called")
},
update: func(_ runtimeClient.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, errors.New("update should not have been called")
},
expected: makeVirtualService(),
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
client := controllertesting.NewMockClient(fake.NewFakeClient(), controllertesting.Mocks{
MockGets: []controllertesting.MockGet{tc.get},
MockCreates: []controllertesting.MockCreate{tc.create},
MockUpdates: []controllertesting.MockUpdate{tc.update},
})
svc, err := CreateVirtualService(context.TODO(), client, getNewChannel())
if tc.err != err {
t.Fatalf("Unexpected error. Expected '%s', actual '%v'", tc.err, err)
}
if diff := cmp.Diff(tc.expected, svc); diff != "" {
t.Fatalf("Unexpected virtual service (-want +got): %s", diff)
}
})
}
}

func TestAddFinalizer(t *testing.T) {
testCases := map[string]struct {
alreadyPresent bool
Expand Down Expand Up @@ -243,6 +445,14 @@ func makeK8sService() *corev1.Service {
}
}

func makeTamperedK8sService() *corev1.Service {
svc := makeK8sService()
svc.Spec = corev1.ServiceSpec{
ClusterIP: "tampered by the unit tests",
}
return svc
}

func makeVirtualService() *istiov1alpha3.VirtualService {
return &istiov1alpha3.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -283,3 +493,11 @@ func makeVirtualService() *istiov1alpha3.VirtualService {
},
}
}

func makeTamperedVirtualService() *istiov1alpha3.VirtualService {
vs := makeVirtualService()
vs.Spec = istiov1alpha3.VirtualServiceSpec{
Gateways: []string{"tamped by the unit tests"},
}
return vs
}
29 changes: 1 addition & 28 deletions pkg/provisioners/provisioner_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -26,33 +25,7 @@ func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, c
Namespace: system.Namespace,
Name: svcName,
}
svc := &corev1.Service{}
err := client.Get(ctx, svcKey, svc)

if errors.IsNotFound(err) {
svc = newDispatcherService(ccp)
err = client.Create(ctx, svc)
if err != nil {
return nil, err
}
return svc, nil
}
if err != nil {
return nil, err
}

expected := newDispatcherService(ccp)
// spec.clusterIP is immutable and is set on existing services. If we don't set this
// to the same value, we will encounter an error while updating.
expected.Spec.ClusterIP = svc.Spec.ClusterIP
if !equality.Semantic.DeepDerivative(expected.Spec, svc.Spec) {
svc.Spec = expected.Spec
err := client.Update(ctx, svc)
if err != nil {
return nil, err
}
}
return svc, nil
return createK8sService(ctx, client, svcKey, newDispatcherService(ccp))
}

func UpdateClusterChannelProvisionerStatus(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.ClusterChannelProvisioner) error {
Expand Down