From 031c09fef3814ca553db154ceb91beab55654f0e Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Mon, 28 Jan 2019 17:13:13 -0500 Subject: [PATCH 1/7] Consolidating commits Signed-off-by: Serguei Bezverkhi --- .../in-memory-channel/in-memory-channel.yaml | 12 +++++-- .../eventing/inmemory/channel/reconcile.go | 35 +++++++++++++++---- .../inmemory/channel/reconcile_test.go | 9 ++--- .../clusterchannelprovisioner/reconcile.go | 15 +++++--- .../reconcile_test.go | 1 + .../eventing/inmemory/controller/main.go | 2 +- pkg/sidecar/fanout/fanout_handler.go | 10 ++++++ test/e2e-tests.sh | 21 ++++++++++- 8 files changed, 86 insertions(+), 19 deletions(-) diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml index 0445e341e22..00ad7f5fbf9 100644 --- a/config/provisioners/in-memory-channel/in-memory-channel.yaml +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -12,6 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +apiVersion: eventing.knative.dev/v1alpha1 +kind: ClusterChannelProvisioner +metadata: + name: in-memory +spec: {} + +--- + apiVersion: eventing.knative.dev/v1alpha1 kind: ClusterChannelProvisioner metadata: @@ -128,7 +136,7 @@ spec: replicas: 1 selector: matchLabels: &labels - clusterChannelProvisioner: in-memory-channel + clusterChannelProvisioner: in-memory role: controller template: metadata: @@ -191,7 +199,7 @@ spec: replicas: 1 selector: matchLabels: &labels - clusterChannelProvisioner: in-memory-channel + clusterChannelProvisioner: in-memory role: dispatcher template: metadata: diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 079a3b62d38..6c119c6a31f 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -39,13 +39,14 @@ import ( const ( finalizerName = controllerAgentName - // Name of the corev1.Events emitted from the reconciliation process channelReconciled = "ChannelReconciled" channelUpdateStatusFailed = "ChannelUpdateStatusFailed" channelConfigSyncFailed = "ChannelConfigSyncFailed" k8sServiceCreateFailed = "K8sServiceCreateFailed" virtualServiceCreateFailed = "VirtualServiceCreateFailed" + // TODO after in-memory-channel is retired, asyncProvisionerName should be removed + asyncProvisionerName = "in-memory" ) type reconciler struct { @@ -110,7 +111,6 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err r.recorder.Eventf(c, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err) return reconcile.Result{}, updateStatusErr } - return reconcile.Result{}, err } @@ -157,11 +157,26 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) } c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace)) - _, err = util.CreateVirtualService(ctx, r.client, c, svc) - if err != nil { - logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) - r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) - return err + if c.Spec.Provisioner.Name == asyncProvisionerName { + _, err = util.CreateVirtualService(ctx, r.client, c, svc) + if err != nil { + logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) + r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) + return err + } + } else { + // We need to have a single dispatcher that is pointed at by _both_ + // ClusterChannelProvisioners. So fake the channel, by saying that its provisioner is the + // one with the single dispatcher. The faked provisioner is used only to determine the + // dispatcher Service's name. + cCopy := c.DeepCopy() + cCopy.Spec.Provisioner.Name = asyncProvisionerName + _, err = util.CreateVirtualService(ctx, r.client, cCopy, svc) + if err != nil { + logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) + r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) + return err + } } c.Status.MarkProvisioned() @@ -225,8 +240,14 @@ func multiChannelFanoutConfig(channels []eventingv1alpha1.Channel) *multichannel Name: c.Name, } if c.Spec.Subscribable != nil { + // TODO After in-memory-channel is retired, this logic must be refactored. + asyncHandler := false + if c.Spec.Provisioner.Name == asyncProvisionerName { + asyncHandler = true + } channelConfig.FanoutConfig = fanout.Config{ Subscriptions: c.Spec.Subscribable.Subscribers, + AsyncHandler: asyncHandler, } } cc = append(cc, channelConfig) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 7c698de1262..c342c0bfa42 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -43,7 +43,8 @@ import ( ) const ( - ccpName = "in-memory-channel" + ccpName = "in-memory-channel" + asyncCCPName = "in-memory" cNamespace = "test-namespace" cName = "test-channel" @@ -680,8 +681,8 @@ func makeVirtualService() *istiov1alpha3.VirtualService { Labels: map[string]string{ util.EventingChannelLabel: cName, util.OldEventingChannelLabel: cName, - util.EventingProvisionerLabel: ccpName, - util.OldEventingProvisionerLabel: ccpName, + util.EventingProvisionerLabel: asyncCCPName, + util.OldEventingProvisionerLabel: asyncCCPName, }, OwnerReferences: []metav1.OwnerReference{ { @@ -705,7 +706,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ - Host: "in-memory-channel-dispatcher.knative-eventing.svc.cluster.local", + Host: "in-memory-dispatcher.knative-eventing.svc.cluster.local", Port: istiov1alpha3.PortSelector{ Number: util.PortNumber, }, diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go index 15e861f8896..04e19ca8933 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go @@ -35,9 +35,6 @@ import ( ) const ( - // Name is the name of the in-memory channel ClusterChannelProvisioner. - Name = "in-memory-channel" - // Channel is the name of the Channel resource in eventing.knative.dev/v1alpha1. Channel = "Channel" @@ -48,6 +45,11 @@ const ( k8sServiceDeleteFailed = "K8sServiceDeleteFailed" ) +var ( + // provisionerNames contains the list of provisioners' names served by this controller + provisionerNames = []string{"in-memory-channel", "in-memory"} +) + type reconciler struct { client client.Client recorder record.EventRecorder @@ -130,7 +132,12 @@ func IsControlled(ref *corev1.ObjectReference) bool { // shouldReconcile determines if this Controller should control (and therefore reconcile) a given // ClusterChannelProvisioner. This Controller only handles in-memory channels. func shouldReconcile(namespace, name string) bool { - return namespace == "" && name == Name + for _, p := range provisionerNames { + if namespace == "" && name == p { + return true + } + } + return false } func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error { diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go index 4ad88ddd2de..e65117c4df4 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go @@ -42,6 +42,7 @@ const ( ccpUid = "test-uid" testErrorMessage = "test-induced-error" testNS = "test-ns" + Name = "in-memory-channel" ) var ( diff --git a/pkg/controller/eventing/inmemory/controller/main.go b/pkg/controller/eventing/inmemory/controller/main.go index a16db0beb94..cae87d8f318 100644 --- a/pkg/controller/eventing/inmemory/controller/main.go +++ b/pkg/controller/eventing/inmemory/controller/main.go @@ -35,7 +35,7 @@ func main() { logger := provisioners.NewProvisionerLoggerFromConfig(logConfig) defer logger.Sync() logger = logger.With( - zap.String("eventing.knative.dev/clusterChannelProvisioner", clusterchannelprovisioner.Name), + zap.String("eventing.knative.dev/clusterChannelProvisioner", "in-memory"), zap.String("eventing.knative.dev/clusterChannelProvisionerComponent", "Controller"), ) flag.Parse() diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go index dd60be7fed0..02d60d40bf6 100644 --- a/pkg/sidecar/fanout/fanout_handler.go +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -40,6 +40,9 @@ const ( // Configuration for a fanout.Handler. type Config struct { Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"` + // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. + // It is expected to be false when used as a sidecar. + AsyncHandler bool `json:"asyncHandler,omitempty"` } // http.Handler that takes a single request in and fans it out to N other servers. @@ -83,6 +86,13 @@ func NewHandler(logger *zap.Logger, config Config) *Handler { func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error { return func(_ provisioners.ChannelReference, m *provisioners.Message) error { + if f.config.AsyncHandler { + go func() { + // Any returned error is already logged in f.dispatch(). + _ = f.dispatch(m) + }() + return nil + } return f.dispatch(m) } } diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index b8381d5a0c1..17466891dfb 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -85,6 +85,25 @@ $(dirname $0)/upload-test-images.sh e2e || fail_test "Error uploading test image # Setup resources common to all eventing tests setup_events_test_resources|| fail_test "Error setting up test resources" -go_test_e2e ./test/e2e || fail_test +go_test_e2e ./test/e2e +exit_result=$? +if [ ${exit_result} -ne 0 ]; then +# Collecting logs from all knative's eventing pods + for namespace in "knative-eventing" "e2etestfn3"; do + for pod in $(kubectl get pod -n $namespace | grep Running | awk '{print $1}' ); do + echo "========== Collecting logs for pod: "${pod}" in $namespace =================" + for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do + echo "----------------------------------------------------------" + echo "Container: "${container} + kubectl logs -n $namespace "${pod}" -c "${container}" || true + echo "----------------------------------------------------------" + kubectl logs -p -n $namespace "${pod}" -c "${container}" || true + echo "----------------------------------------------------------" + done + echo "============================================================" + done + done + fail_test +fi success From 6666e3e8a20524e4e4446e4e2663e910486e9937 Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Wed, 30 Jan 2019 11:01:46 -0500 Subject: [PATCH 2/7] Addressing comments Signed-off-by: Serguei Bezverkhi --- pkg/sidecar/fanout/fanout_handler_test.go | 26 +++++++++++++++++++++++ test/e2e-tests.sh | 9 ++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/pkg/sidecar/fanout/fanout_handler_test.go b/pkg/sidecar/fanout/fanout_handler_test.go index 6702984154d..1b8819bdc0b 100644 --- a/pkg/sidecar/fanout/fanout_handler_test.go +++ b/pkg/sidecar/fanout/fanout_handler_test.go @@ -64,6 +64,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { subscriber func(http.ResponseWriter, *http.Request) channel func(http.ResponseWriter, *http.Request) expectedStatus int + asyncHandler bool }{ "rejected by receiver": { receiverFunc: func(provisioners.ChannelReference, *provisioners.Message) error { @@ -178,6 +179,28 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { }, expectedStatus: http.StatusAccepted, }, + "all subs succeed with async handler": { + subs: []eventingduck.ChannelSubscriberSpec{ + { + SubscriberURI: replaceSubscriber, + ReplyURI: replaceChannel, + }, + { + SubscriberURI: replaceSubscriber, + ReplyURI: replaceChannel, + }, + { + SubscriberURI: replaceSubscriber, + ReplyURI: replaceChannel, + }, + }, + subscriber: callableSucceed, + channel: func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusAccepted) + }, + expectedStatus: http.StatusAccepted, + asyncHandler: true, + }, } for n, tc := range testCases { if n != "fanout times out" { @@ -206,6 +229,9 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { } h := NewHandler(zap.NewNop(), Config{Subscriptions: subs}) + if tc.asyncHandler { + h.config.AsyncHandler = true + } if tc.receiverFunc != nil { h.receiver = provisioners.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar()) } diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 17466891dfb..31fa6feca0f 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -89,18 +89,17 @@ go_test_e2e ./test/e2e exit_result=$? if [ ${exit_result} -ne 0 ]; then # Collecting logs from all knative's eventing pods + echo "============================================================" for namespace in "knative-eventing" "e2etestfn3"; do for pod in $(kubectl get pod -n $namespace | grep Running | awk '{print $1}' ); do - echo "========== Collecting logs for pod: "${pod}" in $namespace =================" for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do - echo "----------------------------------------------------------" - echo "Container: "${container} + echo "Namespace, Pod, Container: ${namespace}, ${pod}, ${container}" kubectl logs -n $namespace "${pod}" -c "${container}" || true echo "----------------------------------------------------------" + echo "Namespace, Pod, Container (Previous instance): ${namespace}, ${pod}, ${container}" kubectl logs -p -n $namespace "${pod}" -c "${container}" || true - echo "----------------------------------------------------------" + echo "============================================================" done - echo "============================================================" done done fail_test From 7a8b7fc133560585f0eb5755f705e33f0d1c4925 Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Thu, 31 Jan 2019 13:02:19 -0500 Subject: [PATCH 3/7] Adding 2 unit tests Signed-off-by: Serguei Bezverkhi --- .../inmemory/channel/reconcile_test.go | 46 +++++++++++++++++-- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index c342c0bfa42..ed7024d7b14 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -519,6 +519,34 @@ func TestReconcile(t *testing.T) { events[channelReconciled], }, }, + { + Name: "Channel reconcile successful - Async channel", + InitialState: []runtime.Object{ + makeChannel("in-memory"), + }, + Mocks: controllertesting.Mocks{}, + WantPresent: []runtime.Object{ + makeVirtualService(), + }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, + }, + { + Name: "Channel reconcile successful - Non Async channel", + // VirtualService should have channel provisioner name + // overwritten to "in-memory" + InitialState: []runtime.Object{ + makeChannel("in-memory-channel"), + }, + Mocks: controllertesting.Mocks{}, + WantPresent: []runtime.Object{ + makeVirtualService(), + }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, + }, } for _, tc := range testCases { @@ -542,7 +570,11 @@ func TestReconcile(t *testing.T) { } } -func makeChannel() *eventingv1alpha1.Channel { +func makeChannel(pn ...string) *eventingv1alpha1.Channel { + provisionerName := ccpName + if len(pn) != 0 { + provisionerName = pn[0] + } c := &eventingv1alpha1.Channel{ TypeMeta: metav1.TypeMeta{ APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), @@ -555,7 +587,7 @@ func makeChannel() *eventingv1alpha1.Channel { }, Spec: eventingv1alpha1.ChannelSpec{ Provisioner: &corev1.ObjectReference{ - Name: ccpName, + Name: provisionerName, }, }, } @@ -632,7 +664,11 @@ func makeConfigMapWithVerifyConfigMapData() *corev1.ConfigMap { return cm } -func makeK8sService() *corev1.Service { +func makeK8sService(pn ...string) *corev1.Service { + provisionerName := ccpName + if len(pn) != 0 { + provisionerName = pn[0] + } return &corev1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -644,8 +680,8 @@ func makeK8sService() *corev1.Service { Labels: map[string]string{ util.EventingChannelLabel: cName, util.OldEventingChannelLabel: cName, - util.EventingProvisionerLabel: ccpName, - util.OldEventingProvisionerLabel: ccpName, + util.EventingProvisionerLabel: provisionerName, + util.OldEventingProvisionerLabel: provisionerName, }, OwnerReferences: []metav1.OwnerReference{ { From 389437313e231ab9285b6ff444d38e138a3f9df9 Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Thu, 31 Jan 2019 17:32:09 -0500 Subject: [PATCH 4/7] Addressing comments Signed-off-by: Serguei Bezverkhi --- .../eventing/inmemory/channel/reconcile.go | 6 ++-- .../inmemory/channel/reconcile_test.go | 28 +++++++++++-------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 6c119c6a31f..44a410571c5 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -149,7 +149,9 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) util.AddFinalizer(c, finalizerName) - svc, err := util.CreateK8sService(ctx, r.client, c) + cCopy := c.DeepCopy() + cCopy.Spec.Provisioner.Name = asyncProvisionerName + svc, err := util.CreateK8sService(ctx, r.client, cCopy) if err != nil { logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) r.recorder.Eventf(c, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err) @@ -169,8 +171,6 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) // ClusterChannelProvisioners. So fake the channel, by saying that its provisioner is the // one with the single dispatcher. The faked provisioner is used only to determine the // dispatcher Service's name. - cCopy := c.DeepCopy() - cCopy.Spec.Provisioner.Name = asyncProvisionerName _, err = util.CreateVirtualService(ctx, r.client, cCopy, svc) if err != nil { logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index ed7024d7b14..69d1336b8fe 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -527,6 +527,7 @@ func TestReconcile(t *testing.T) { Mocks: controllertesting.Mocks{}, WantPresent: []runtime.Object{ makeVirtualService(), + makeK8sService(), }, WantEvent: []corev1.Event{ events[channelReconciled], @@ -542,6 +543,7 @@ func TestReconcile(t *testing.T) { Mocks: controllertesting.Mocks{}, WantPresent: []runtime.Object{ makeVirtualService(), + makeK8sService(), }, WantEvent: []corev1.Event{ events[channelReconciled], @@ -571,10 +573,6 @@ func TestReconcile(t *testing.T) { } func makeChannel(pn ...string) *eventingv1alpha1.Channel { - provisionerName := ccpName - if len(pn) != 0 { - provisionerName = pn[0] - } c := &eventingv1alpha1.Channel{ TypeMeta: metav1.TypeMeta{ APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), @@ -587,7 +585,7 @@ func makeChannel(pn ...string) *eventingv1alpha1.Channel { }, Spec: eventingv1alpha1.ChannelSpec{ Provisioner: &corev1.ObjectReference{ - Name: provisionerName, + Name: getProvisionerName(pn), }, }, } @@ -595,6 +593,16 @@ func makeChannel(pn ...string) *eventingv1alpha1.Channel { return c } +// getProvisionerName returns either default provisioner name defined by ccpName variable +// or, if specified, a custom provisioner name. +func getProvisionerName(pn []string) string { + provisionerName := ccpName + if len(pn) != 0 { + provisionerName = pn[0] + } + return provisionerName +} + func makeChannelWithFinalizerAndAddress() *eventingv1alpha1.Channel { c := makeChannelWithFinalizer() c.Status.SetAddress(serviceAddress) @@ -664,11 +672,7 @@ func makeConfigMapWithVerifyConfigMapData() *corev1.ConfigMap { return cm } -func makeK8sService(pn ...string) *corev1.Service { - provisionerName := ccpName - if len(pn) != 0 { - provisionerName = pn[0] - } +func makeK8sService() *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -680,8 +684,8 @@ func makeK8sService(pn ...string) *corev1.Service { Labels: map[string]string{ util.EventingChannelLabel: cName, util.OldEventingChannelLabel: cName, - util.EventingProvisionerLabel: provisionerName, - util.OldEventingProvisionerLabel: provisionerName, + util.EventingProvisionerLabel: asyncCCPName, + util.OldEventingProvisionerLabel: asyncCCPName, }, OwnerReferences: []metav1.OwnerReference{ { From a286f38c2cd61ffda29f67972172a212c04aa63b Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Fri, 1 Feb 2019 17:28:23 -0500 Subject: [PATCH 5/7] Addressing comments Signed-off-by: Serguei Bezverkhi --- pkg/controller/eventing/inmemory/channel/reconcile.go | 6 +++--- .../eventing/inmemory/channel/reconcile_test.go | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 44a410571c5..6c119c6a31f 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -149,9 +149,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) util.AddFinalizer(c, finalizerName) - cCopy := c.DeepCopy() - cCopy.Spec.Provisioner.Name = asyncProvisionerName - svc, err := util.CreateK8sService(ctx, r.client, cCopy) + svc, err := util.CreateK8sService(ctx, r.client, c) if err != nil { logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) r.recorder.Eventf(c, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err) @@ -171,6 +169,8 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) // ClusterChannelProvisioners. So fake the channel, by saying that its provisioner is the // one with the single dispatcher. The faked provisioner is used only to determine the // dispatcher Service's name. + cCopy := c.DeepCopy() + cCopy.Spec.Provisioner.Name = asyncProvisionerName _, err = util.CreateVirtualService(ctx, r.client, cCopy, svc) if err != nil { logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 69d1336b8fe..4e4251ece00 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -527,7 +527,7 @@ func TestReconcile(t *testing.T) { Mocks: controllertesting.Mocks{}, WantPresent: []runtime.Object{ makeVirtualService(), - makeK8sService(), + makeK8sService("in-memory"), }, WantEvent: []corev1.Event{ events[channelReconciled], @@ -543,7 +543,7 @@ func TestReconcile(t *testing.T) { Mocks: controllertesting.Mocks{}, WantPresent: []runtime.Object{ makeVirtualService(), - makeK8sService(), + makeK8sService("in-memory-channel"), }, WantEvent: []corev1.Event{ events[channelReconciled], @@ -672,7 +672,7 @@ func makeConfigMapWithVerifyConfigMapData() *corev1.ConfigMap { return cm } -func makeK8sService() *corev1.Service { +func makeK8sService(pn ...string) *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -684,8 +684,8 @@ func makeK8sService() *corev1.Service { Labels: map[string]string{ util.EventingChannelLabel: cName, util.OldEventingChannelLabel: cName, - util.EventingProvisionerLabel: asyncCCPName, - util.OldEventingProvisionerLabel: asyncCCPName, + util.EventingProvisionerLabel: getProvisionerName(pn), + util.OldEventingProvisionerLabel: getProvisionerName(pn), }, OwnerReferences: []metav1.OwnerReference{ { From 370aa48f6e8bc32c890ac1c36dd8c7600201b138 Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Fri, 1 Feb 2019 20:50:23 -0500 Subject: [PATCH 6/7] Final changes Signed-off-by: Serguei Bezverkhi --- .../eventing/inmemory/channel/reconcile.go | 8 ++++---- .../eventing/inmemory/channel/reconcile_test.go | 14 ++++++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 6c119c6a31f..dd345f1d6fe 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -46,7 +46,7 @@ const ( k8sServiceCreateFailed = "K8sServiceCreateFailed" virtualServiceCreateFailed = "VirtualServiceCreateFailed" // TODO after in-memory-channel is retired, asyncProvisionerName should be removed - asyncProvisionerName = "in-memory" + defaultProvisionerName = "in-memory-channel" ) type reconciler struct { @@ -157,7 +157,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) } c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace)) - if c.Spec.Provisioner.Name == asyncProvisionerName { + if c.Spec.Provisioner.Name == defaultProvisionerName { _, err = util.CreateVirtualService(ctx, r.client, c, svc) if err != nil { logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) @@ -170,7 +170,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) // one with the single dispatcher. The faked provisioner is used only to determine the // dispatcher Service's name. cCopy := c.DeepCopy() - cCopy.Spec.Provisioner.Name = asyncProvisionerName + cCopy.Spec.Provisioner.Name = defaultProvisionerName _, err = util.CreateVirtualService(ctx, r.client, cCopy, svc) if err != nil { logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) @@ -242,7 +242,7 @@ func multiChannelFanoutConfig(channels []eventingv1alpha1.Channel) *multichannel if c.Spec.Subscribable != nil { // TODO After in-memory-channel is retired, this logic must be refactored. asyncHandler := false - if c.Spec.Provisioner.Name == asyncProvisionerName { + if c.Spec.Provisioner.Name != defaultProvisionerName { asyncHandler = true } channelConfig.FanoutConfig = fanout.Config{ diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 4e4251ece00..4d80f227898 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -521,6 +521,8 @@ func TestReconcile(t *testing.T) { }, { Name: "Channel reconcile successful - Async channel", + // VirtualService should have channel provisioner name + // defaults to in-memory-channel but the service should match provisioner's service name InitialState: []runtime.Object{ makeChannel("in-memory"), }, @@ -536,14 +538,14 @@ func TestReconcile(t *testing.T) { { Name: "Channel reconcile successful - Non Async channel", // VirtualService should have channel provisioner name - // overwritten to "in-memory" + // defaults to in-memory-channel InitialState: []runtime.Object{ - makeChannel("in-memory-channel"), + makeChannel(), }, Mocks: controllertesting.Mocks{}, WantPresent: []runtime.Object{ makeVirtualService(), - makeK8sService("in-memory-channel"), + makeK8sService(), }, WantEvent: []corev1.Event{ events[channelReconciled], @@ -721,8 +723,8 @@ func makeVirtualService() *istiov1alpha3.VirtualService { Labels: map[string]string{ util.EventingChannelLabel: cName, util.OldEventingChannelLabel: cName, - util.EventingProvisionerLabel: asyncCCPName, - util.OldEventingProvisionerLabel: asyncCCPName, + util.EventingProvisionerLabel: ccpName, + util.OldEventingProvisionerLabel: ccpName, }, OwnerReferences: []metav1.OwnerReference{ { @@ -746,7 +748,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ - Host: "in-memory-dispatcher.knative-eventing.svc.cluster.local", + Host: "in-memory-channel-dispatcher.knative-eventing.svc.cluster.local", Port: istiov1alpha3.PortSelector{ Number: util.PortNumber, }, From 079c109f012eedf8fe4d7771e79839e190ea30fb Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Fri, 1 Feb 2019 23:57:20 -0500 Subject: [PATCH 7/7] Adjusting yaml Signed-off-by: Serguei Bezverkhi --- config/provisioners/in-memory-channel/in-memory-channel.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml index 00ad7f5fbf9..13d5c7e0a75 100644 --- a/config/provisioners/in-memory-channel/in-memory-channel.yaml +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -136,7 +136,7 @@ spec: replicas: 1 selector: matchLabels: &labels - clusterChannelProvisioner: in-memory + clusterChannelProvisioner: in-memory-channel role: controller template: metadata: @@ -199,7 +199,7 @@ spec: replicas: 1 selector: matchLabels: &labels - clusterChannelProvisioner: in-memory + clusterChannelProvisioner: in-memory-channel role: dispatcher template: metadata: