diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml index 0445e341e22..13d5c7e0a75 100644 --- a/config/provisioners/in-memory-channel/in-memory-channel.yaml +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -12,6 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +apiVersion: eventing.knative.dev/v1alpha1 +kind: ClusterChannelProvisioner +metadata: + name: in-memory +spec: {} + +--- + apiVersion: eventing.knative.dev/v1alpha1 kind: ClusterChannelProvisioner metadata: diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 079a3b62d38..dd345f1d6fe 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -39,13 +39,14 @@ import ( const ( finalizerName = controllerAgentName - // Name of the corev1.Events emitted from the reconciliation process channelReconciled = "ChannelReconciled" channelUpdateStatusFailed = "ChannelUpdateStatusFailed" channelConfigSyncFailed = "ChannelConfigSyncFailed" k8sServiceCreateFailed = "K8sServiceCreateFailed" virtualServiceCreateFailed = "VirtualServiceCreateFailed" + // TODO after in-memory-channel is retired, asyncProvisionerName should be removed + defaultProvisionerName = "in-memory-channel" ) type reconciler struct { @@ -110,7 +111,6 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err r.recorder.Eventf(c, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err) return reconcile.Result{}, updateStatusErr } - return reconcile.Result{}, err } @@ -157,11 +157,26 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) } c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace)) - _, err = util.CreateVirtualService(ctx, r.client, c, svc) - if err != nil { - logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) - r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) - return err + if c.Spec.Provisioner.Name == defaultProvisionerName { + _, err = util.CreateVirtualService(ctx, r.client, c, svc) + if err != nil { + logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) + r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) + return err + } + } else { + // We need to have a single dispatcher that is pointed at by _both_ + // ClusterChannelProvisioners. So fake the channel, by saying that its provisioner is the + // one with the single dispatcher. The faked provisioner is used only to determine the + // dispatcher Service's name. + cCopy := c.DeepCopy() + cCopy.Spec.Provisioner.Name = defaultProvisionerName + _, err = util.CreateVirtualService(ctx, r.client, cCopy, svc) + if err != nil { + logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) + r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) + return err + } } c.Status.MarkProvisioned() @@ -225,8 +240,14 @@ func multiChannelFanoutConfig(channels []eventingv1alpha1.Channel) *multichannel Name: c.Name, } if c.Spec.Subscribable != nil { + // TODO After in-memory-channel is retired, this logic must be refactored. + asyncHandler := false + if c.Spec.Provisioner.Name != defaultProvisionerName { + asyncHandler = true + } channelConfig.FanoutConfig = fanout.Config{ Subscriptions: c.Spec.Subscribable.Subscribers, + AsyncHandler: asyncHandler, } } cc = append(cc, channelConfig) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 7c698de1262..4d80f227898 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -43,7 +43,8 @@ import ( ) const ( - ccpName = "in-memory-channel" + ccpName = "in-memory-channel" + asyncCCPName = "in-memory" cNamespace = "test-namespace" cName = "test-channel" @@ -518,6 +519,38 @@ func TestReconcile(t *testing.T) { events[channelReconciled], }, }, + { + Name: "Channel reconcile successful - Async channel", + // VirtualService should have channel provisioner name + // defaults to in-memory-channel but the service should match provisioner's service name + InitialState: []runtime.Object{ + makeChannel("in-memory"), + }, + Mocks: controllertesting.Mocks{}, + WantPresent: []runtime.Object{ + makeVirtualService(), + makeK8sService("in-memory"), + }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, + }, + { + Name: "Channel reconcile successful - Non Async channel", + // VirtualService should have channel provisioner name + // defaults to in-memory-channel + InitialState: []runtime.Object{ + makeChannel(), + }, + Mocks: controllertesting.Mocks{}, + WantPresent: []runtime.Object{ + makeVirtualService(), + makeK8sService(), + }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, + }, } for _, tc := range testCases { @@ -541,7 +574,7 @@ func TestReconcile(t *testing.T) { } } -func makeChannel() *eventingv1alpha1.Channel { +func makeChannel(pn ...string) *eventingv1alpha1.Channel { c := &eventingv1alpha1.Channel{ TypeMeta: metav1.TypeMeta{ APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), @@ -554,7 +587,7 @@ func makeChannel() *eventingv1alpha1.Channel { }, Spec: eventingv1alpha1.ChannelSpec{ Provisioner: &corev1.ObjectReference{ - Name: ccpName, + Name: getProvisionerName(pn), }, }, } @@ -562,6 +595,16 @@ func makeChannel() *eventingv1alpha1.Channel { return c } +// getProvisionerName returns either default provisioner name defined by ccpName variable +// or, if specified, a custom provisioner name. +func getProvisionerName(pn []string) string { + provisionerName := ccpName + if len(pn) != 0 { + provisionerName = pn[0] + } + return provisionerName +} + func makeChannelWithFinalizerAndAddress() *eventingv1alpha1.Channel { c := makeChannelWithFinalizer() c.Status.SetAddress(serviceAddress) @@ -631,7 +674,7 @@ func makeConfigMapWithVerifyConfigMapData() *corev1.ConfigMap { return cm } -func makeK8sService() *corev1.Service { +func makeK8sService(pn ...string) *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -643,8 +686,8 @@ func makeK8sService() *corev1.Service { Labels: map[string]string{ util.EventingChannelLabel: cName, util.OldEventingChannelLabel: cName, - util.EventingProvisionerLabel: ccpName, - util.OldEventingProvisionerLabel: ccpName, + util.EventingProvisionerLabel: getProvisionerName(pn), + util.OldEventingProvisionerLabel: getProvisionerName(pn), }, OwnerReferences: []metav1.OwnerReference{ { diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go index 15e861f8896..04e19ca8933 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go @@ -35,9 +35,6 @@ import ( ) const ( - // Name is the name of the in-memory channel ClusterChannelProvisioner. - Name = "in-memory-channel" - // Channel is the name of the Channel resource in eventing.knative.dev/v1alpha1. Channel = "Channel" @@ -48,6 +45,11 @@ const ( k8sServiceDeleteFailed = "K8sServiceDeleteFailed" ) +var ( + // provisionerNames contains the list of provisioners' names served by this controller + provisionerNames = []string{"in-memory-channel", "in-memory"} +) + type reconciler struct { client client.Client recorder record.EventRecorder @@ -130,7 +132,12 @@ func IsControlled(ref *corev1.ObjectReference) bool { // shouldReconcile determines if this Controller should control (and therefore reconcile) a given // ClusterChannelProvisioner. This Controller only handles in-memory channels. func shouldReconcile(namespace, name string) bool { - return namespace == "" && name == Name + for _, p := range provisionerNames { + if namespace == "" && name == p { + return true + } + } + return false } func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error { diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go index 4ad88ddd2de..e65117c4df4 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go @@ -42,6 +42,7 @@ const ( ccpUid = "test-uid" testErrorMessage = "test-induced-error" testNS = "test-ns" + Name = "in-memory-channel" ) var ( diff --git a/pkg/controller/eventing/inmemory/controller/main.go b/pkg/controller/eventing/inmemory/controller/main.go index a16db0beb94..cae87d8f318 100644 --- a/pkg/controller/eventing/inmemory/controller/main.go +++ b/pkg/controller/eventing/inmemory/controller/main.go @@ -35,7 +35,7 @@ func main() { logger := provisioners.NewProvisionerLoggerFromConfig(logConfig) defer logger.Sync() logger = logger.With( - zap.String("eventing.knative.dev/clusterChannelProvisioner", clusterchannelprovisioner.Name), + zap.String("eventing.knative.dev/clusterChannelProvisioner", "in-memory"), zap.String("eventing.knative.dev/clusterChannelProvisionerComponent", "Controller"), ) flag.Parse() diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go index dd60be7fed0..02d60d40bf6 100644 --- a/pkg/sidecar/fanout/fanout_handler.go +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -40,6 +40,9 @@ const ( // Configuration for a fanout.Handler. type Config struct { Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"` + // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. + // It is expected to be false when used as a sidecar. + AsyncHandler bool `json:"asyncHandler,omitempty"` } // http.Handler that takes a single request in and fans it out to N other servers. @@ -83,6 +86,13 @@ func NewHandler(logger *zap.Logger, config Config) *Handler { func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error { return func(_ provisioners.ChannelReference, m *provisioners.Message) error { + if f.config.AsyncHandler { + go func() { + // Any returned error is already logged in f.dispatch(). + _ = f.dispatch(m) + }() + return nil + } return f.dispatch(m) } } diff --git a/pkg/sidecar/fanout/fanout_handler_test.go b/pkg/sidecar/fanout/fanout_handler_test.go index 6702984154d..1b8819bdc0b 100644 --- a/pkg/sidecar/fanout/fanout_handler_test.go +++ b/pkg/sidecar/fanout/fanout_handler_test.go @@ -64,6 +64,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { subscriber func(http.ResponseWriter, *http.Request) channel func(http.ResponseWriter, *http.Request) expectedStatus int + asyncHandler bool }{ "rejected by receiver": { receiverFunc: func(provisioners.ChannelReference, *provisioners.Message) error { @@ -178,6 +179,28 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { }, expectedStatus: http.StatusAccepted, }, + "all subs succeed with async handler": { + subs: []eventingduck.ChannelSubscriberSpec{ + { + SubscriberURI: replaceSubscriber, + ReplyURI: replaceChannel, + }, + { + SubscriberURI: replaceSubscriber, + ReplyURI: replaceChannel, + }, + { + SubscriberURI: replaceSubscriber, + ReplyURI: replaceChannel, + }, + }, + subscriber: callableSucceed, + channel: func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusAccepted) + }, + expectedStatus: http.StatusAccepted, + asyncHandler: true, + }, } for n, tc := range testCases { if n != "fanout times out" { @@ -206,6 +229,9 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { } h := NewHandler(zap.NewNop(), Config{Subscriptions: subs}) + if tc.asyncHandler { + h.config.AsyncHandler = true + } if tc.receiverFunc != nil { h.receiver = provisioners.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar()) } diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index b8381d5a0c1..31fa6feca0f 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -85,6 +85,24 @@ $(dirname $0)/upload-test-images.sh e2e || fail_test "Error uploading test image # Setup resources common to all eventing tests setup_events_test_resources|| fail_test "Error setting up test resources" -go_test_e2e ./test/e2e || fail_test +go_test_e2e ./test/e2e +exit_result=$? +if [ ${exit_result} -ne 0 ]; then +# Collecting logs from all knative's eventing pods + echo "============================================================" + for namespace in "knative-eventing" "e2etestfn3"; do + for pod in $(kubectl get pod -n $namespace | grep Running | awk '{print $1}' ); do + for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do + echo "Namespace, Pod, Container: ${namespace}, ${pod}, ${container}" + kubectl logs -n $namespace "${pod}" -c "${container}" || true + echo "----------------------------------------------------------" + echo "Namespace, Pod, Container (Previous instance): ${namespace}, ${pod}, ${container}" + kubectl logs -p -n $namespace "${pod}" -c "${container}" || true + echo "============================================================" + done + done + done + fail_test +fi success