diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml index 941da4fd30e..6fb82837fc9 100644 --- a/config/provisioners/in-memory-channel/in-memory-channel.yaml +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -59,6 +59,22 @@ rules: - list - watch - create + - apiGroups: + - "" # Core API group. + resources: + - services + resourceNames: + - in-memory-channel-clusterbus + verbs: + - delete + - apiGroups: + - "" # Core API group. + resources: + - services + resourceNames: + - in-memory-channel-dispatcher + verbs: + - update - apiGroups: - "" # Core API Group. resources: @@ -76,6 +92,7 @@ rules: - list - watch - create + - update --- diff --git a/config/provisioners/kafka/kafka.yaml b/config/provisioners/kafka/kafka.yaml index 9d184f5f42e..6d0926d0226 100644 --- a/config/provisioners/kafka/kafka.yaml +++ b/config/provisioners/kafka/kafka.yaml @@ -51,6 +51,14 @@ rules: - list - watch - create + - apiGroups: + - "" # Core API group. + resources: + - services + resourceNames: + - kafka-dispatcher + verbs: + - update - apiGroups: - "" # Core API Group. resources: @@ -68,6 +76,7 @@ rules: - list - watch - create + - update --- apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/pkg/buses/logger.go b/pkg/buses/logger.go deleted file mode 100644 index ed439cc34ae..00000000000 --- a/pkg/buses/logger.go +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2018 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package buses - -import ( - "github.com/knative/pkg/logging" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -const ( - busLoggingComponent = "bus" - dispatcherLoggingComponent = "dispatcher" - receiverLoggingComponent = "receiver" -) - -// NewLoggingConfig creates a static logging configuration appropriate for a -// bus. All logging levels are set to Info. -func NewLoggingConfig() *logging.Config { - lc := &logging.Config{} - lc.LoggingConfig = `{ - "level": "info", - "development": false, - "outputPaths": ["stdout"], - "errorOutputPaths": ["stderr"], - "encoding": "json", - "encoderConfig": { - "timeKey": "ts", - "levelKey": "level", - "nameKey": "logger", - "callerKey": "caller", - "messageKey": "msg", - "stacktraceKey": "stacktrace", - "lineEnding": "", - "levelEncoder": "", - "timeEncoder": "iso8601", - "durationEncoder": "", - "callerEncoder": "" - } - }` - lc.LoggingLevel = make(map[string]zapcore.Level) - lc.LoggingLevel[busLoggingComponent] = zapcore.InfoLevel - return lc -} - -// NewBusLoggerFromConfig creates a new zap logger for the bus component based -// on the provided configuration -func NewBusLoggerFromConfig(config *logging.Config) *zap.SugaredLogger { - logger, _ := logging.NewLoggerFromConfig(config, busLoggingComponent) - return logger -} diff --git a/pkg/buses/references_test.go b/pkg/buses/references_test.go deleted file mode 100644 index ef75a5070fc..00000000000 --- a/pkg/buses/references_test.go +++ /dev/null @@ -1,95 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package buses_test - -import ( - "fmt" - "testing" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/buses" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -const ( - referencesTestNamespace = "test-namespace" - referencesTestChannelName = "test-channel" - referencesTestSubscriptionName = "test-subscription" -) - -func TestNewChannelReference(t *testing.T) { - channel := &eventingv1alpha1.Channel{ - ObjectMeta: metav1.ObjectMeta{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - }, - } - expected := buses.ChannelReference{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - } - actual := buses.NewChannelReference(channel) - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) - } -} - -func TestNewChannelReferenceFromSubscription(t *testing.T) { - subscription := &eventingv1alpha1.Subscription{ - ObjectMeta: metav1.ObjectMeta{ - Name: referencesTestSubscriptionName, - Namespace: referencesTestNamespace, - }, - Spec: eventingv1alpha1.SubscriptionSpec{ - Channel: corev1.ObjectReference{ - Name: referencesTestChannelName, - }, - }, - } - expected := buses.ChannelReference{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - } - actual := buses.NewChannelReferenceFromSubscription(subscription) - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) - } -} - -func TestNewChannelReferenceFromNames(t *testing.T) { - expected := buses.ChannelReference{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - } - actual := buses.NewChannelReferenceFromNames(referencesTestChannelName, referencesTestNamespace) - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) - } -} - -func TestChannelReference_String(t *testing.T) { - ref := buses.ChannelReference{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - } - expected := fmt.Sprintf("%s/%s", referencesTestNamespace, referencesTestChannelName) - actual := ref.String() - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) - } -} diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 7542a4df80e..e94cda47139 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -661,7 +661,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ - Host: "in-memory-channel-clusterbus.knative-eventing.svc.cluster.local", + Host: "in-memory-channel-dispatcher.knative-eventing.svc.cluster.local", Port: istiov1alpha3.PortSelector{ Number: util.PortNumber, }, diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go index eb7420627bd..a182e113a92 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go @@ -18,17 +18,20 @@ package clusterchannelprovisioner import ( "context" + "fmt" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/system" ) const ( @@ -139,6 +142,32 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste logger.Warn("ClusterChannelProvisioner's K8s Service is not owned by the ClusterChannelProvisioner", zap.Any("clusterChannelProvisioner", ccp), zap.Any("service", svc)) } + // The name of the svc has changed since version 0.2.1. Hence, delete old dispatcher service (in-memory-channel-clusterbus) + // that was created previously in version 0.2.0 to ensure backwards compatibility. + err = r.deleteOldDispatcherService(ctx, ccp) + if err != nil { + logger.Info("Error deleting the old ClusterChannelProvisioner's K8s Service", zap.Error(err)) + return err + } + ccp.Status.MarkReady() return nil } + +func (r *reconciler) deleteOldDispatcherService(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error { + svcName := fmt.Sprintf("%s-clusterbus", ccp.Name) + svcKey := types.NamespacedName{ + Namespace: system.Namespace, + Name: svcName, + } + svc := &corev1.Service{} + err := r.client.Get(ctx, svcKey, svc) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + + return r.client.Delete(ctx, svc) +} diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go index 030e8dd2025..c22a7bc0488 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go @@ -178,6 +178,20 @@ func TestReconcile(t *testing.T) { makeReadyClusterChannelProvisioner(), }, }, + { + Name: "Delete old dispatcher", + InitialState: []runtime.Object{ + makeClusterChannelProvisioner(), + makeOldK8sService(), + }, + WantPresent: []runtime.Object{ + makeReadyClusterChannelProvisioner(), + makeK8sService(), + }, + WantAbsent: []runtime.Object{ + makeOldK8sService(), + }, + }, { Name: "Create dispatcher - not owned by CCP", InitialState: []runtime.Object{ @@ -279,7 +293,7 @@ func makeK8sService() *corev1.Service { }, ObjectMeta: metav1.ObjectMeta{ Namespace: system.Namespace, - Name: fmt.Sprintf("%s-clusterbus", Name), + Name: fmt.Sprintf("%s-dispatcher", Name), OwnerReferences: []metav1.OwnerReference{ { APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), @@ -305,6 +319,12 @@ func makeK8sService() *corev1.Service { } } +func makeOldK8sService() *corev1.Service { + svc := makeK8sService() + svc.ObjectMeta.Name = fmt.Sprintf("%s-clusterbus", Name) + return svc +} + func makeK8sServiceNotOwnedByClusterChannelProvisioner() *corev1.Service { svc := makeK8sService() svc.OwnerReferences = nil diff --git a/pkg/controller/eventing/inmemory/controller/main.go b/pkg/controller/eventing/inmemory/controller/main.go index bb0a680a72b..a16db0beb94 100644 --- a/pkg/controller/eventing/inmemory/controller/main.go +++ b/pkg/controller/eventing/inmemory/controller/main.go @@ -20,9 +20,9 @@ import ( "flag" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/buses" "github.com/knative/eventing/pkg/controller/eventing/inmemory/channel" "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterchannelprovisioner" + "github.com/knative/eventing/pkg/provisioners" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "github.com/knative/pkg/signals" "go.uber.org/zap" @@ -31,8 +31,8 @@ import ( ) func main() { - logConfig := buses.NewLoggingConfig() - logger := buses.NewBusLoggerFromConfig(logConfig) + logConfig := provisioners.NewLoggingConfig() + logger := provisioners.NewProvisionerLoggerFromConfig(logConfig) defer logger.Sync() logger = logger.With( zap.String("eventing.knative.dev/clusterChannelProvisioner", clusterchannelprovisioner.Name), diff --git a/pkg/controller/names.go b/pkg/controller/names.go index 6026013e34b..4f4467c478e 100644 --- a/pkg/controller/names.go +++ b/pkg/controller/names.go @@ -18,22 +18,6 @@ package controller import "fmt" -func ClusterBusDispatcherServiceName(clusterBusName string) string { - return fmt.Sprintf("%s-clusterbus", clusterBusName) -} - -func ChannelVirtualServiceName(channelName string) string { - return fmt.Sprintf("%s-channel", channelName) -} - -func ChannelServiceName(channelName string) string { - return fmt.Sprintf("%s-channel", channelName) -} - -func ChannelHostName(channelName, namespace string) string { - return fmt.Sprintf("%s.%s.channels.cluster.local", channelName, namespace) -} - func ServiceHostName(serviceName, namespace string) string { return fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, namespace) } diff --git a/pkg/controller/names_test.go b/pkg/controller/names_test.go index ca0d3933a25..d44f490f3de 100644 --- a/pkg/controller/names_test.go +++ b/pkg/controller/names_test.go @@ -26,30 +26,6 @@ func TestNames(t *testing.T) { F func() string Want string }{{ - Name: "ClusterBusDispatcherServiceName", - F: func() string { - return ClusterBusDispatcherServiceName("foo") - }, - Want: "foo-clusterbus", - }, { - Name: "ChannelVirtualServiceName", - F: func() string { - return ChannelVirtualServiceName("foo") - }, - Want: "foo-channel", - }, { - Name: "ChannelServiceName", - F: func() string { - return ChannelServiceName("foo") - }, - Want: "foo-channel", - }, { - Name: "ChannelHostName", - F: func() string { - return ChannelHostName("foo", "namespace") - }, - Want: "foo.namespace.channels.cluster.local", - }, { Name: "ServiceHostName", F: func() string { return ServiceHostName("foo", "namespace") diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index 165ed49cea4..54392f4ea5e 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -2,6 +2,7 @@ package provisioners import ( "context" + "fmt" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" corev1 "k8s.io/api/core/v1" @@ -38,7 +39,7 @@ func RemoveFinalizer(c *eventingv1alpha1.Channel, finalizerName string) { func getK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { svcKey := types.NamespacedName{ Namespace: c.Namespace, - Name: controller.ChannelServiceName(c.Name), + Name: ChannelServiceName(c.Name), } svc := &corev1.Service{} err := client.Get(ctx, svcKey, svc) @@ -64,29 +65,40 @@ func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *event func getVirtualService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { vsk := runtimeClient.ObjectKey{ Namespace: c.Namespace, - Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name), + Name: ChannelVirtualServiceName(c.ObjectMeta.Name), } vs := &istiov1alpha3.VirtualService{} err := client.Get(ctx, vsk, vs) return vs, err } -func CreateVirtualService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { - virtualService, err := getVirtualService(ctx, client, c) +func CreateVirtualService(ctx context.Context, client runtimeClient.Client, channel *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { + virtualService, err := getVirtualService(ctx, client, channel) // If the resource doesn't exist, we'll create it if errors.IsNotFound(err) { - virtualService = newVirtualService(c) + virtualService = newVirtualService(channel) err = client.Create(ctx, virtualService) + if err != nil { + return nil, err + } + return virtualService, nil } - - // If an error occurs during Get/Create, we'll requeue the item so we can - // attempt processing again later. This could have been caused by a - // temporary network failure, or any other transient reason. if err != nil { return nil, err } + // Update VirtualService if it has changed. This is possible since in version 0.2.0, the destinationHost in + // spec.HTTP.Route for the dispatcher was changed from *-clusterbus to *-dispatcher. Even otherwise, this + // reconciliation is useful for the future mutations to the object. + expected := newVirtualService(channel) + if !equality.Semantic.DeepDerivative(expected.Spec, virtualService.Spec) { + virtualService.Spec = expected.Spec + err := client.Update(ctx, virtualService) + if err != nil { + return nil, err + } + } return virtualService, nil } @@ -124,7 +136,7 @@ func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { } return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: controller.ChannelServiceName(c.ObjectMeta.Name), + Name: ChannelServiceName(c.ObjectMeta.Name), Namespace: c.Namespace, Labels: labels, OwnerReferences: []metav1.OwnerReference{ @@ -154,10 +166,10 @@ func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.Virtual "channel": channel.Name, "provisioner": channel.Spec.Provisioner.Name, } - destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Name), system.Namespace) + destinationHost := controller.ServiceHostName(ChannelDispatcherServiceName(channel.Spec.Provisioner.Name), system.Namespace) return &istiov1alpha3.VirtualService{ ObjectMeta: metav1.ObjectMeta{ - Name: controller.ChannelVirtualServiceName(channel.Name), + Name: ChannelVirtualServiceName(channel.Name), Namespace: channel.Namespace, Labels: labels, OwnerReferences: []metav1.OwnerReference{ @@ -170,12 +182,12 @@ func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.Virtual }, Spec: istiov1alpha3.VirtualServiceSpec{ Hosts: []string{ - controller.ServiceHostName(controller.ChannelServiceName(channel.Name), channel.Namespace), - controller.ChannelHostName(channel.Name, channel.Namespace), + controller.ServiceHostName(ChannelServiceName(channel.Name), channel.Namespace), + ChannelHostName(channel.Name, channel.Namespace), }, Http: []istiov1alpha3.HTTPRoute{{ Rewrite: &istiov1alpha3.HTTPRewrite{ - Authority: controller.ChannelHostName(channel.Name, channel.Namespace), + Authority: ChannelHostName(channel.Name, channel.Namespace), }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ @@ -189,3 +201,15 @@ func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.Virtual }, } } + +func ChannelVirtualServiceName(channelName string) string { + return fmt.Sprintf("%s-channel", channelName) +} + +func ChannelServiceName(channelName string) string { + return fmt.Sprintf("%s-channel", channelName) +} + +func ChannelHostName(channelName, namespace string) string { + return fmt.Sprintf("%s.%s.channels.cluster.local", channelName, namespace) +} diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go index 0503ee778ec..ed6af36c056 100644 --- a/pkg/provisioners/channel_util_test.go +++ b/pkg/provisioners/channel_util_test.go @@ -33,65 +33,122 @@ func init() { eventingv1alpha1.AddToScheme(scheme.Scheme) } -func TestCreateK8sService(t *testing.T) { - want := makeK8sService() - client := fake.NewFakeClient() - got, _ := CreateK8sService(context.TODO(), client, getNewChannel()) - - ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) - if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("Service (-want, +got) = %v", diff) - } -} - -func TestCreateK8sService_Existing(t *testing.T) { - want := makeK8sService() - client := fake.NewFakeClient(want) - got, _ := CreateK8sService(context.TODO(), client, getNewChannel()) - - ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) - if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("Service (-want, +got) = %v", diff) - } -} - -func TestCreateVirtualService(t *testing.T) { - want := makeVirtualService() - client := fake.NewFakeClient() - got, _ := CreateVirtualService(context.TODO(), client, getNewChannel()) - - ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) - if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("VirtualService (-want, +got) = %v", diff) - } -} - -func TestCreateVirtualService_Existing(t *testing.T) { - want := makeVirtualService() - client := fake.NewFakeClient(want) - got, _ := CreateVirtualService(context.TODO(), client, getNewChannel()) - - ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) - if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("VirtualService (-want, +got) = %v", diff) +func TestChannelUtils(t *testing.T) { + testCases := []struct { + name string + f func() (metav1.Object, error) + want metav1.Object + }{{ + name: "CreateK8sService", + f: func() (metav1.Object, error) { + client := fake.NewFakeClient() + return CreateK8sService(context.TODO(), client, getNewChannel()) + }, + want: makeK8sService(), + }, { + name: "CreateK8sService_Existing", + f: func() (metav1.Object, error) { + existing := makeK8sService() + client := fake.NewFakeClient(existing) + return CreateK8sService(context.TODO(), client, getNewChannel()) + }, + want: makeK8sService(), + }, { + name: "CreateVirtualService", + f: func() (metav1.Object, error) { + client := fake.NewFakeClient() + return CreateVirtualService(context.TODO(), client, getNewChannel()) + }, + want: makeVirtualService(), + }, { + name: "CreateVirtualService_Existing", + f: func() (metav1.Object, error) { + existing := makeVirtualService() + client := fake.NewFakeClient(existing) + return CreateVirtualService(context.TODO(), client, getNewChannel()) + }, + want: makeVirtualService(), + }, { + name: "CreateVirtualService_ModifiedSpec", + f: func() (metav1.Object, error) { + existing := makeVirtualService() + destHost := fmt.Sprintf("%s-clusterbus.knative-eventing.svc.cluster.local", clusterChannelProvisionerName) + existing.Spec.Http[0].Route[0].Destination.Host = destHost + client := fake.NewFakeClient(existing) + CreateVirtualService(context.TODO(), client, getNewChannel()) + + got := &istiov1alpha3.VirtualService{} + err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: fmt.Sprintf("%s-channel", channelName)}, got) + return got, err + }, + want: makeVirtualService(), + }, { + name: "UpdateChannel", + f: func() (metav1.Object, error) { + oldChannel := getNewChannel() + client := fake.NewFakeClient(oldChannel) + + AddFinalizer(oldChannel, "test-finalizer") + oldChannel.Status.SetAddress("test-domain") + UpdateChannel(context.TODO(), client, oldChannel) + + got := &eventingv1alpha1.Channel{} + err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: channelName}, got) + return got, err + }, + want: func() metav1.Object { + channel := getNewChannel() + AddFinalizer(channel, "test-finalizer") + channel.Status.SetAddress("test-domain") + return channel + }(), + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + got, err := tc.f() + if err != nil { + t.Errorf("Unexpected error %+v", err) + } + if diff := cmp.Diff(tc.want, got, ignore); diff != "" { + t.Errorf("%s (-want, +got) = %v", tc.name, diff) + } + }) } } -func TestUpdateChannel(t *testing.T) { - oldChannel := getNewChannel() - client := fake.NewFakeClient(oldChannel) - - want := getNewChannel() - AddFinalizer(want, "test-finalizer") - want.Status.SetAddress("test-domain") - UpdateChannel(context.TODO(), client, want) - - got := &eventingv1alpha1.Channel{} - client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: channelName}, got) - - ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) - if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("Channel (-want, +got) = %v", diff) +func TestChannelNames(t *testing.T) { + testCases := []struct { + Name string + F func() string + Want string + }{{ + Name: "ChannelVirtualServiceName", + F: func() string { + return ChannelVirtualServiceName("foo") + }, + Want: "foo-channel", + }, { + Name: "ChannelServiceName", + F: func() string { + return ChannelServiceName("foo") + }, + Want: "foo-channel", + }, { + Name: "ChannelHostName", + F: func() string { + return ChannelHostName("foo", "namespace") + }, + Want: "foo.namespace.channels.cluster.local", + }} + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + if got := tc.F(); got != tc.Want { + t.Errorf("want %v, got %v", tc.Want, got) + } + }) } } @@ -186,7 +243,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ - Host: fmt.Sprintf("%s-clusterbus.knative-eventing.svc.cluster.local", clusterChannelProvisionerName), + Host: fmt.Sprintf("%s-dispatcher.knative-eventing.svc.cluster.local", clusterChannelProvisionerName), Port: istiov1alpha3.PortSelector{ Number: PortNumber, }, diff --git a/pkg/provisioners/kafka/dispatcher/dispatcher.go b/pkg/provisioners/kafka/dispatcher/dispatcher.go index c954d464eac..2b79c2f3afb 100644 --- a/pkg/provisioners/kafka/dispatcher/dispatcher.go +++ b/pkg/provisioners/kafka/dispatcher/dispatcher.go @@ -27,7 +27,7 @@ import ( "go.uber.org/zap" eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" - "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/provisioners/kafka/controller" topicUtils "github.com/knative/eventing/pkg/provisioners/utils" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" @@ -37,11 +37,11 @@ type KafkaDispatcher struct { config atomic.Value updateLock sync.Mutex - receiver *buses.MessageReceiver - dispatcher *buses.MessageDispatcher + receiver *provisioners.MessageReceiver + dispatcher *provisioners.MessageDispatcher kafkaAsyncProducer sarama.AsyncProducer - kafkaConsumers map[buses.ChannelReference]map[subscription]KafkaConsumer + kafkaConsumers map[provisioners.ChannelReference]map[subscription]KafkaConsumer kafkaCluster KafkaCluster logger *zap.Logger @@ -96,7 +96,7 @@ func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error // Subscribe to new subscriptions for _, cc := range config.ChannelConfigs { - channelRef := buses.ChannelReference{ + channelRef := provisioners.ChannelReference{ Name: cc.Name, Namespace: cc.Namespace, } @@ -161,7 +161,7 @@ func (d *KafkaDispatcher) Start(stopCh <-chan struct{}) error { return nil } -func (d *KafkaDispatcher) subscribe(channelRef buses.ChannelReference, sub subscription) error { +func (d *KafkaDispatcher) subscribe(channelRef provisioners.ChannelReference, sub subscription) error { d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) @@ -204,7 +204,7 @@ func (d *KafkaDispatcher) subscribe(channelRef buses.ChannelReference, sub subsc return nil } -func (d *KafkaDispatcher) unsubscribe(channel buses.ChannelReference, sub subscription) error { +func (d *KafkaDispatcher) unsubscribe(channel provisioners.ChannelReference, sub subscription) error { d.logger.Info("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub)) if consumer, ok := d.kafkaConsumers[channel][sub]; ok { delete(d.kafkaConsumers[channel], sub) @@ -215,8 +215,8 @@ func (d *KafkaDispatcher) unsubscribe(channel buses.ChannelReference, sub subscr // dispatchMessage sends the request to exactly one subscription. It handles both the `call` and // the `sink` portions of the subscription. -func (d *KafkaDispatcher) dispatchMessage(m *buses.Message, sub subscription) error { - return d.dispatcher.DispatchMessage(m, sub.SubscriberURI, sub.ReplyURI, buses.DispatchDefaults{}) +func (d *KafkaDispatcher) dispatchMessage(m *provisioners.Message, sub subscription) error { + return d.dispatcher.DispatchMessage(m, sub.SubscriberURI, sub.ReplyURI, provisioners.DispatchDefaults{}) } func (d *KafkaDispatcher) getConfig() *multichannelfanout.Config { @@ -243,16 +243,16 @@ func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, erro } dispatcher := &KafkaDispatcher{ - dispatcher: buses.NewMessageDispatcher(logger.Sugar()), + dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), kafkaCluster: &saramaCluster{kafkaBrokers: brokers}, - kafkaConsumers: make(map[buses.ChannelReference]map[subscription]KafkaConsumer), + kafkaConsumers: make(map[provisioners.ChannelReference]map[subscription]KafkaConsumer), kafkaAsyncProducer: producer, logger: logger, } - receiverFunc := buses.NewMessageReceiver( - func(channel buses.ChannelReference, message *buses.Message) error { + receiverFunc := provisioners.NewMessageReceiver( + func(channel provisioners.ChannelReference, message *provisioners.Message) error { dispatcher.kafkaAsyncProducer.Input() <- toKafkaMessage(channel, message) return nil }, logger.Sugar()) @@ -261,19 +261,19 @@ func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, erro return dispatcher, nil } -func fromKafkaMessage(kafkaMessage *sarama.ConsumerMessage) *buses.Message { +func fromKafkaMessage(kafkaMessage *sarama.ConsumerMessage) *provisioners.Message { headers := make(map[string]string) for _, header := range kafkaMessage.Headers { headers[string(header.Key)] = string(header.Value) } - message := buses.Message{ + message := provisioners.Message{ Headers: headers, Payload: kafkaMessage.Value, } return &message } -func toKafkaMessage(channel buses.ChannelReference, message *buses.Message) *sarama.ProducerMessage { +func toKafkaMessage(channel provisioners.ChannelReference, message *provisioners.Message) *sarama.ProducerMessage { kafkaMessage := sarama.ProducerMessage{ Topic: topicUtils.TopicName(controller.KafkaChannelSeparator, channel.Namespace, channel.Name), Value: sarama.ByteEncoder(message.Payload), diff --git a/pkg/provisioners/kafka/dispatcher/dispatcher_test.go b/pkg/provisioners/kafka/dispatcher/dispatcher_test.go index 7d3de1af286..8795964a5f8 100644 --- a/pkg/provisioners/kafka/dispatcher/dispatcher_test.go +++ b/pkg/provisioners/kafka/dispatcher/dispatcher_test.go @@ -14,7 +14,7 @@ import ( "k8s.io/api/core/v1" eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" - "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" ) @@ -239,7 +239,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { t.Logf("Running %s", t.Name()) d := &KafkaDispatcher{ kafkaCluster: &mockSaramaCluster{closed: true}, - kafkaConsumers: make(map[buses.ChannelReference]map[subscription]KafkaConsumer), + kafkaConsumers: make(map[provisioners.ChannelReference]map[subscription]KafkaConsumer), logger: zap.NewNop(), } @@ -301,7 +301,7 @@ func TestFromKafkaMessage(t *testing.T) { }, Value: data, } - want := &buses.Message{ + want := &provisioners.Message{ Headers: map[string]string{ "k1": "v1", }, @@ -315,11 +315,11 @@ func TestFromKafkaMessage(t *testing.T) { func TestToKafkaMessage(t *testing.T) { data := []byte("data") - channelRef := buses.ChannelReference{ + channelRef := provisioners.ChannelReference{ Name: "test-channel", Namespace: "test-ns", } - msg := &buses.Message{ + msg := &provisioners.Message{ Headers: map[string]string{ "k1": "v1", }, @@ -365,8 +365,8 @@ func TestSubscribe(t *testing.T) { data := []byte("data") d := &KafkaDispatcher{ kafkaCluster: sc, - kafkaConsumers: make(map[buses.ChannelReference]map[subscription]KafkaConsumer), - dispatcher: buses.NewMessageDispatcher(zap.NewNop().Sugar()), + kafkaConsumers: make(map[provisioners.ChannelReference]map[subscription]KafkaConsumer), + dispatcher: provisioners.NewMessageDispatcher(zap.NewNop().Sugar()), logger: zap.NewNop(), } @@ -378,7 +378,7 @@ func TestSubscribe(t *testing.T) { server := httptest.NewServer(testHandler) defer server.Close() - channelRef := buses.ChannelReference{ + channelRef := provisioners.ChannelReference{ Name: "test-channel", Namespace: "test-ns", } @@ -416,7 +416,7 @@ func TestSubscribeError(t *testing.T) { logger: zap.NewNop(), } - channelRef := buses.ChannelReference{ + channelRef := provisioners.ChannelReference{ Name: "test-channel", Namespace: "test-ns", } @@ -440,7 +440,7 @@ func TestUnsubscribeUnknownSub(t *testing.T) { logger: zap.NewNop(), } - channelRef := buses.ChannelReference{ + channelRef := provisioners.ChannelReference{ Name: "test-channel", Namespace: "test-ns", } @@ -462,7 +462,7 @@ func TestKafkaDispatcher_Start(t *testing.T) { t.Errorf("expected error want %s, got %s", "message receiver is not set", err) } - d.receiver = buses.NewMessageReceiver(func(channel buses.ChannelReference, message *buses.Message) error { + d.receiver = provisioners.NewMessageReceiver(func(channel provisioners.ChannelReference, message *provisioners.Message) error { return nil }, zap.NewNop().Sugar()) err = d.Start(make(chan struct{})) diff --git a/pkg/provisioners/logging.go b/pkg/provisioners/logger.go similarity index 100% rename from pkg/provisioners/logging.go rename to pkg/provisioners/logger.go diff --git a/pkg/buses/logger_test.go b/pkg/provisioners/logger_test.go similarity index 74% rename from pkg/buses/logger_test.go rename to pkg/provisioners/logger_test.go index 5c7960229be..caa43c23160 100644 --- a/pkg/buses/logger_test.go +++ b/pkg/provisioners/logger_test.go @@ -14,28 +14,30 @@ See the License for the specific language governing permissions and limitations under the License. */ -package buses_test +package provisioners import ( "testing" + "github.com/google/go-cmp/cmp" "go.uber.org/zap" - - "github.com/knative/eventing/pkg/buses" + "go.uber.org/zap/zapcore" ) func TestNewLoggingConfig(t *testing.T) { - config := buses.NewLoggingConfig() - expected := zap.InfoLevel - actual := config.LoggingLevel["bus"] - if expected != actual { + config := NewLoggingConfig() + expected := map[string]zapcore.Level{ + "provisioner": zap.InfoLevel, + } + actual := config.LoggingLevel + if cmp.Diff(expected, actual) != "" { t.Errorf("%s expected: %+v got: %+v", "Logging level", expected, actual) } } func TestNewBusLoggerFromConfig(t *testing.T) { - config := buses.NewLoggingConfig() - logger := buses.NewBusLoggerFromConfig(config) + config := NewLoggingConfig() + logger := NewProvisionerLoggerFromConfig(config) expected := true actual := logger.Desugar().Core().Enabled(zap.InfoLevel) if expected != actual { diff --git a/pkg/buses/message.go b/pkg/provisioners/message.go similarity index 91% rename from pkg/buses/message.go rename to pkg/provisioners/message.go index 23a45b690a7..ede9f7f54a9 100644 --- a/pkg/buses/message.go +++ b/pkg/provisioners/message.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package buses +package provisioners import ( "errors" @@ -36,7 +36,7 @@ var forwardPrefixes = []string{ "x-ot-", } -// Message represents an chunk of data within a bus. The message contains both +// Message represents an chunk of data within a channel dispatcher. The message contains both // a map of string headers and a binary payload. // // A message may represent a CloudEvent. @@ -51,7 +51,7 @@ type Message struct { Payload []byte } -// ErrUnknownChannel is returned when a message is received by a bus for a +// ErrUnknownChannel is returned when a message is received by a channel dispatcher for a // channel that does not exist. var ErrUnknownChannel = errors.New("unknown channel") diff --git a/pkg/buses/message_dispatcher.go b/pkg/provisioners/message_dispatcher.go similarity index 99% rename from pkg/buses/message_dispatcher.go rename to pkg/provisioners/message_dispatcher.go index 39346b7eb2d..2e0eadae77f 100644 --- a/pkg/buses/message_dispatcher.go +++ b/pkg/provisioners/message_dispatcher.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package buses +package provisioners import ( "bytes" diff --git a/pkg/buses/message_dispatcher_test.go b/pkg/provisioners/message_dispatcher_test.go similarity index 99% rename from pkg/buses/message_dispatcher_test.go rename to pkg/provisioners/message_dispatcher_test.go index ab173169080..ac02f645ad4 100644 --- a/pkg/buses/message_dispatcher_test.go +++ b/pkg/provisioners/message_dispatcher_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package buses +package provisioners import ( "bytes" diff --git a/pkg/buses/message_receiver.go b/pkg/provisioners/message_receiver.go similarity index 98% rename from pkg/buses/message_receiver.go rename to pkg/provisioners/message_receiver.go index a4aa93f4435..fa78ac57f5b 100644 --- a/pkg/buses/message_receiver.go +++ b/pkg/provisioners/message_receiver.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package buses +package provisioners import ( "fmt" @@ -25,7 +25,7 @@ import ( "go.uber.org/zap" ) -// MessageReceiver starts a server to receive new messages for the bus. The new +// MessageReceiver starts a server to receive new messages for the channel dispatcher. The new // message is emitted via the receiver function. const ( MessageReceiverPort = 8080 diff --git a/pkg/provisioners/provisioner_util.go b/pkg/provisioners/provisioner_util.go index 787a1c58155..3448d05a692 100644 --- a/pkg/provisioners/provisioner_util.go +++ b/pkg/provisioners/provisioner_util.go @@ -13,14 +13,15 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "fmt" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/controller" "github.com/knative/eventing/pkg/system" "github.com/knative/pkg/logging" ) func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, ccp *eventingv1alpha1.ClusterChannelProvisioner) (*corev1.Service, error) { - svcName := controller.ClusterBusDispatcherServiceName(ccp.Name) + svcName := ChannelDispatcherServiceName(ccp.Name) svcKey := types.NamespacedName{ Namespace: system.Namespace, Name: svcName, @@ -31,13 +32,26 @@ func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, c if errors.IsNotFound(err) { svc = newDispatcherService(ccp) err = client.Create(ctx, svc) + if err != nil { + return nil, err + } + return svc, nil } - - // If an error occurred in either Get or Create, we need to reconcile again. if err != nil { return nil, err } + expected := newDispatcherService(ccp) + // spec.clusterIP is immutable and is set on existing services. If we don't set this + // to the same value, we will encounter an error while updating. + expected.Spec.ClusterIP = svc.Spec.ClusterIP + if !equality.Semantic.DeepDerivative(expected.Spec, svc.Spec) { + svc.Spec = expected.Spec + err := client.Update(ctx, svc) + if err != nil { + return nil, err + } + } return svc, nil } @@ -63,7 +77,7 @@ func newDispatcherService(ccp *eventingv1alpha1.ClusterChannelProvisioner) *core labels := DispatcherLabels(ccp.Name) return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: controller.ClusterBusDispatcherServiceName(ccp.Name), + Name: ChannelDispatcherServiceName(ccp.Name), Namespace: system.Namespace, Labels: labels, OwnerReferences: []metav1.OwnerReference{ @@ -93,3 +107,7 @@ func DispatcherLabels(ccpName string) map[string]string { "role": "dispatcher", } } + +func ChannelDispatcherServiceName(ccpName string) string { + return fmt.Sprintf("%s-dispatcher", ccpName) +} diff --git a/pkg/provisioners/provisioner_util_test.go b/pkg/provisioners/provisioner_util_test.go index b93e7b780be..cd46ef9db56 100644 --- a/pkg/provisioners/provisioner_util_test.go +++ b/pkg/provisioners/provisioner_util_test.go @@ -19,50 +19,120 @@ import ( ) const ( - clusterChannelProvisionerName = "kafka" + clusterChannelProvisionerName = "kafka" + otherClusterChannelProvisionerName = "kafka-new" + testClusterIP = "10.59.249.3" ) -func TestCreateDispatcherService(t *testing.T) { - want := makeDispatcherService() - client := fake.NewFakeClient() - got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) - - ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) - if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("Service (-want, +got) = %v", diff) - } -} - -func TestCreateDispatcherService_Existing(t *testing.T) { - want := makeDispatcherService() - client := fake.NewFakeClient(want) - got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) - - ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) - if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("Service (-want, +got) = %v", diff) +func TestProvisionerUtils(t *testing.T) { + testCases := []struct { + name string + f func() (metav1.Object, error) + want metav1.Object + }{{ + name: "CreateDispatcherService", + f: func() (metav1.Object, error) { + client := fake.NewFakeClient() + return CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) + }, + want: makeDispatcherService(), + }, { + name: "CreateDispatcherService_Existing", + f: func() (metav1.Object, error) { + existing := makeDispatcherService() + client := fake.NewFakeClient(existing) + return CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) + }, + want: makeDispatcherService(), + }, { + name: "CreateDispatcherService_ModifiedSpec", + f: func() (metav1.Object, error) { + existing := makeDispatcherService() + existing.Spec.Selector = map[string]string{ + "clusterChannelProvisioner": otherClusterChannelProvisionerName, + "role": "dispatcher", + } + client := fake.NewFakeClient(existing) + CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) + + got := &corev1.Service{} + err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: system.Namespace, Name: fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName)}, got) + return got, err + }, + want: makeDispatcherService(), + }, { + name: "CreateDispatcherService_DoNotModifyClusterIP", + f: func() (metav1.Object, error) { + existing := makeDispatcherService() + existing.Spec.ClusterIP = testClusterIP + client := fake.NewFakeClient(existing) + CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) + + got := &corev1.Service{} + err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: system.Namespace, Name: fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName)}, got) + return got, err + }, + want: func() metav1.Object { + svc := makeDispatcherService() + svc.Spec.ClusterIP = testClusterIP + return svc + }(), + }, { + name: "UpdateClusterChannelProvisioner", + f: func() (metav1.Object, error) { + ccp := getNewClusterChannelProvisioner() + client := fake.NewFakeClient(ccp) + + // Update more than just Status + ccp.Status.MarkReady() + ccp.ObjectMeta.Annotations = map[string]string{"test-annotation": "testing"} + UpdateClusterChannelProvisionerStatus(context.TODO(), client, ccp) + + got := &eventingv1alpha1.ClusterChannelProvisioner{} + err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: clusterChannelProvisionerName}, got) + return got, err + }, + want: func() metav1.Object { + // Only status should be updated + ccp := getNewClusterChannelProvisioner() + ccp.Status.MarkReady() + return ccp + }(), + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + got, err := tc.f() + if err != nil { + t.Errorf("Unexpected error %+v", err) + } + if diff := cmp.Diff(tc.want, got, ignore); diff != "" { + t.Errorf("%s (-want, +got) = %v", tc.name, diff) + } + }) } } -func TestUpdateClusterChannelProvisioner(t *testing.T) { - ccp := getNewClusterChannelProvisioner() - client := fake.NewFakeClient(ccp) - - // Update more than just Status - ccp.Status.MarkReady() - ccp.ObjectMeta.Annotations = map[string]string{"test-annotation": "testing"} - UpdateClusterChannelProvisionerStatus(context.TODO(), client, ccp) - - got := &eventingv1alpha1.ClusterChannelProvisioner{} - client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: clusterChannelProvisionerName}, got) - - // Only status should be updated - want := getNewClusterChannelProvisioner() - want.Status.MarkReady() - - ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) - if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("ClusterChannelProvisioner (-want, +got) = %v", diff) +func TestNames(t *testing.T) { + testCases := []struct { + Name string + F func() string + Want string + }{{ + Name: "ChannelDispatcherServiceName", + F: func() string { + return ChannelDispatcherServiceName("foo") + }, + Want: "foo-dispatcher", + }} + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + if got := tc.F(); got != tc.Want { + t.Errorf("want %v, got %v", tc.Want, got) + } + }) } } @@ -88,7 +158,7 @@ func makeDispatcherService() *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: system.Namespace, - Name: fmt.Sprintf("%s-clusterbus", clusterChannelProvisionerName), + Name: fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName), OwnerReferences: []metav1.OwnerReference{ { APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), diff --git a/pkg/buses/references.go b/pkg/provisioners/references.go similarity index 50% rename from pkg/buses/references.go rename to pkg/provisioners/references.go index 7455022fee5..39045bb9b1d 100644 --- a/pkg/buses/references.go +++ b/pkg/provisioners/references.go @@ -14,12 +14,10 @@ * limitations under the License. */ -package buses +package provisioners import ( "fmt" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" ) // ChannelReference references a Channel within the cluster by name and @@ -29,26 +27,6 @@ type ChannelReference struct { Name string } -// NewChannelReference creates a ChannelReference from a Channel -func NewChannelReference(channel *eventingv1alpha1.Channel) ChannelReference { - return NewChannelReferenceFromNames(channel.Name, channel.Namespace) -} - -// NewChannelReferenceFromSubscription creates a ChannelReference from a -// Subscription for a Channel. -func NewChannelReferenceFromSubscription(subscription *eventingv1alpha1.Subscription) ChannelReference { - return NewChannelReferenceFromNames(subscription.Spec.Channel.Name, subscription.Namespace) -} - -// NewChannelReferenceFromNames creates a ChannelReference for a name and -// namespace. -func NewChannelReferenceFromNames(name, namespace string) ChannelReference { - return ChannelReference{ - Namespace: namespace, - Name: name, - } -} - func (r *ChannelReference) String() string { return fmt.Sprintf("%s/%s", r.Namespace, r.Name) } diff --git a/pkg/provisioners/references_test.go b/pkg/provisioners/references_test.go new file mode 100644 index 00000000000..04e2b4b9aac --- /dev/null +++ b/pkg/provisioners/references_test.go @@ -0,0 +1,39 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provisioners + +import ( + "fmt" + "testing" +) + +const ( + referencesTestNamespace = "test-namespace" + referencesTestChannelName = "test-channel" +) + +func TestChannelReference_String(t *testing.T) { + ref := ChannelReference{ + Name: referencesTestChannelName, + Namespace: referencesTestNamespace, + } + expected := fmt.Sprintf("%s/%s", referencesTestNamespace, referencesTestChannelName) + actual := ref.String() + if expected != actual { + t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) + } +} diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go index f5873c527a8..dd60be7fed0 100644 --- a/pkg/sidecar/fanout/fanout_handler.go +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -27,7 +27,7 @@ import ( "time" eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" - "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" ) @@ -47,8 +47,8 @@ type Handler struct { config Config receivedMessages chan *forwardMessage - receiver *buses.MessageReceiver - dispatcher *buses.MessageDispatcher + receiver *provisioners.MessageReceiver + dispatcher *provisioners.MessageDispatcher // TODO: Plumb context through the receiver and dispatcher and use that to store the timeout, // rather than a member variable. @@ -61,7 +61,7 @@ var _ http.Handler = &Handler{} // forwardMessage is passed between the Receiver and the Dispatcher. type forwardMessage struct { - msg *buses.Message + msg *provisioners.Message done chan<- error } @@ -70,19 +70,19 @@ func NewHandler(logger *zap.Logger, config Config) *Handler { handler := &Handler{ logger: logger, config: config, - dispatcher: buses.NewMessageDispatcher(logger.Sugar()), + dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), receivedMessages: make(chan *forwardMessage, messageBufferSize), timeout: defaultTimeout, } // The receiver function needs to point back at the handler itself, so set it up after // initialization. - handler.receiver = buses.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar()) + handler.receiver = provisioners.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar()) return handler } -func createReceiverFunction(f *Handler) func(buses.ChannelReference, *buses.Message) error { - return func(_ buses.ChannelReference, m *buses.Message) error { +func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error { + return func(_ provisioners.ChannelReference, m *provisioners.Message) error { return f.dispatch(m) } } @@ -93,7 +93,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out // requests return successfully, then return nil. Else, return an error. -func (f *Handler) dispatch(msg *buses.Message) error { +func (f *Handler) dispatch(msg *provisioners.Message) error { errorCh := make(chan error, len(f.config.Subscriptions)) for _, sub := range f.config.Subscriptions { go func(s eventingduck.ChannelSubscriberSpec) { @@ -119,6 +119,6 @@ func (f *Handler) dispatch(msg *buses.Message) error { // makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and // the `sink` portions of the subscription. -func (f *Handler) makeFanoutRequest(m buses.Message, sub eventingduck.ChannelSubscriberSpec) error { - return f.dispatcher.DispatchMessage(&m, sub.SubscriberURI, sub.ReplyURI, buses.DispatchDefaults{}) +func (f *Handler) makeFanoutRequest(m provisioners.Message, sub eventingduck.ChannelSubscriberSpec) error { + return f.dispatcher.DispatchMessage(&m, sub.SubscriberURI, sub.ReplyURI, provisioners.DispatchDefaults{}) } diff --git a/pkg/sidecar/fanout/fanout_handler_test.go b/pkg/sidecar/fanout/fanout_handler_test.go index a61f389c549..6702984154d 100644 --- a/pkg/sidecar/fanout/fanout_handler_test.go +++ b/pkg/sidecar/fanout/fanout_handler_test.go @@ -27,7 +27,7 @@ import ( "time" eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" - "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/provisioners" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -58,7 +58,7 @@ var ( func TestFanoutHandler_ServeHTTP(t *testing.T) { testCases := map[string]struct { - receiverFunc func(buses.ChannelReference, *buses.Message) error + receiverFunc func(provisioners.ChannelReference, *provisioners.Message) error timeout time.Duration subs []eventingduck.ChannelSubscriberSpec subscriber func(http.ResponseWriter, *http.Request) @@ -66,7 +66,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { expectedStatus int }{ "rejected by receiver": { - receiverFunc: func(buses.ChannelReference, *buses.Message) error { + receiverFunc: func(provisioners.ChannelReference, *provisioners.Message) error { return errors.New("rejected by test-receiver") }, expectedStatus: http.StatusInternalServerError, @@ -207,7 +207,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { h := NewHandler(zap.NewNop(), Config{Subscriptions: subs}) if tc.receiverFunc != nil { - h.receiver = buses.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar()) + h.receiver = provisioners.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar()) } if tc.timeout != 0 { h.timeout = tc.timeout diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go index 3882bb6a2ae..d74a5c902ab 100644 --- a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go @@ -27,11 +27,12 @@ package multichannelfanout import ( "fmt" + "net/http" + "github.com/google/go-cmp/cmp" - "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/sidecar/fanout" "go.uber.org/zap" - "net/http" ) // The configuration of this handler. @@ -59,7 +60,7 @@ func makeChannelKeyFromConfig(config ChannelConfig) string { // getChannelKey extracts the channel key from the given HTTP request. func getChannelKey(r *http.Request) string { - cr := buses.ParseChannel(r.Host) + cr := provisioners.ParseChannel(r.Host) return makeChannelKey(cr.Namespace, cr.Name) }