diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index ea0094fba73..5317d44320c 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -42,10 +42,11 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.uber.org/zap" - _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" crlog "sigs.k8s.io/controller-runtime/pkg/runtime/log" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) var ( diff --git a/cmd/controller/main.go b/cmd/controller/main.go index a508d7092a9..82851fc1bce 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -24,20 +24,13 @@ import ( "os" "time" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/logconfig" "github.com/knative/eventing/pkg/reconciler/v1alpha1/broker" "github.com/knative/eventing/pkg/reconciler/v1alpha1/channel" "github.com/knative/eventing/pkg/reconciler/v1alpha1/namespace" "github.com/knative/eventing/pkg/reconciler/v1alpha1/subscription" "github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/manager" - - // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). - _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/logconfig" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "github.com/knative/pkg/configmap" "github.com/knative/pkg/logging" @@ -46,9 +39,14 @@ import ( "github.com/knative/pkg/system" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" controllerruntime "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) const ( diff --git a/cmd/fanoutsidecar/main.go b/cmd/fanoutsidecar/main.go index 59e8ce8892b..370289ffb5e 100644 --- a/cmd/fanoutsidecar/main.go +++ b/cmd/fanoutsidecar/main.go @@ -25,49 +25,47 @@ import ( "fmt" "log" "net/http" - "strings" "time" - "github.com/knative/eventing/pkg/sidecar/configmap/filesystem" - "github.com/knative/eventing/pkg/sidecar/configmap/watcher" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/channelwatcher" + "github.com/knative/eventing/pkg/logging" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" "github.com/knative/eventing/pkg/sidecar/swappable" - "github.com/knative/eventing/pkg/utils" - "github.com/knative/pkg/system" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "k8s.io/client-go/kubernetes" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" -) - -const ( - defaultConfigMapName = "in-memory-channel-dispatcher-config-map" - - // The following are the only valid values of the config_map_noticer flag. - cmnfVolume = "volume" - cmnfWatcher = "watcher" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) var ( readTimeout = 1 * time.Minute writeTimeout = 1 * time.Minute - port int - configMapNoticer string - configMapNamespace string - configMapName string + port int + channelProvisioners listFlags ) -func init() { - flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.") - flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerValues())) - flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace(), "The namespace of the ConfigMap that is watched for configuration.") - flag.StringVar(&configMapName, "config_map_name", defaultConfigMapName, "The name of the ConfigMap that is watched for configuration.") +type listFlags []string + +func (l *listFlags) String() string { + return "" +} +func (l *listFlags) Set(value string) error { + *l = append(*l, value) + return nil } -func configMapNoticerValues() string { - return strings.Join([]string{cmnfVolume, cmnfWatcher}, ", ") +func init() { + flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.") + flag.Var(&channelProvisioners, "channel_provisioner", "The provisioner of the channels that will be watched.") } func main() { @@ -84,14 +82,18 @@ func main() { logger.Fatal("--sidecar_port flag must be set") } + if len(channelProvisioners) < 1 { + logger.Fatal("--channel_provisioner must be specified") + } + sh, err := swappable.NewEmptyHandler(logger) if err != nil { logger.Fatal("Unable to create swappable.Handler", zap.Error(err)) } - mgr, err := setupConfigMapNoticer(logger, sh.UpdateConfig) + mgr, err := setupChannelWatcher(logger, sh.UpdateConfig) if err != nil { - logger.Fatal("Unable to create configMap noticer.", zap.Error(err)) + logger.Fatal("Unable to create channel watcher.", zap.Error(err)) } s := &http.Server{ @@ -125,57 +127,87 @@ func main() { } } -func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateConfig) (manager.Manager, error) { +func setupChannelWatcher(logger *zap.Logger, configUpdated swappable.UpdateConfig) (manager.Manager, error) { mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) if err != nil { - logger.Error("Error starting manager.", zap.Error(err)) + logger.Error("Error creating new maanger.", zap.Error(err)) return nil, err } - - switch configMapNoticer { - case cmnfVolume: - err = setupConfigMapVolume(logger, mgr, configUpdated) - case cmnfWatcher: - err = setupConfigMapWatcher(logger, mgr, configUpdated) - default: - err = fmt.Errorf("need to provide the --config_map_noticer flag (valid values are %s)", configMapNoticerValues()) - } - if err != nil { + if err = v1alpha1.AddToScheme(mgr.GetScheme()); err != nil { + logger.Error("Error while adding eventing scheme to manager.", zap.Error(err)) return nil, err } + channelwatcher.New(mgr, logger, updateChannelConfig(configUpdated)) return mgr, nil } -func setupConfigMapVolume(logger *zap.Logger, mgr manager.Manager, configUpdated swappable.UpdateConfig) error { - cmn, err := filesystem.NewConfigMapWatcher(logger, filesystem.ConfigDir, configUpdated) - if err != nil { - logger.Error("Unable to create filesystem.ConifgMapWatcher", zap.Error(err)) - return err - } - if err = mgr.Add(cmn); err != nil { - logger.Error("Unable to add the config map watcher", zap.Error(err)) - return err +func updateChannelConfig(updateConfig swappable.UpdateConfig) channelwatcher.WatchHandlerFunc { + return func(ctx context.Context, c client.Client, chanNamespacedName types.NamespacedName) error { + channels, err := listAllChannels(ctx, c) + if err != nil { + logging.FromContext(ctx).Info("Unable to list channels", zap.Error(err)) + return err + } + config := multiChannelFanoutConfig(channels) + return updateConfig(config) } - return nil } -func setupConfigMapWatcher(logger *zap.Logger, mgr manager.Manager, configUpdated swappable.UpdateConfig) error { - kc, err := kubernetes.NewForConfig(mgr.GetConfig()) - if err != nil { - return err +func listAllChannels(ctx context.Context, c client.Client) ([]v1alpha1.Channel, error) { + channels := make([]v1alpha1.Channel, 0) + for { + cl := &v1alpha1.ChannelList{} + opts := &client.ListOptions{ + // Set Raw because if we need to get more than one page, then we will put the continue token + // into opts.Raw.Continue. + Raw: &metav1.ListOptions{}, + } + if err := c.List(ctx, opts, cl); err != nil { + return nil, err + } + for _, c := range cl.Items { + if c.Status.IsReady() && shouldWatch(&c) { + channels = append(channels, c) + } + } + if cl.Continue != "" { + opts.Raw.Continue = cl.Continue + } else { + return channels, nil + } } +} - cmw, err := watcher.NewWatcher(logger, kc, configMapNamespace, configMapName, configUpdated) - if err != nil { - return err +func shouldWatch(ch *v1alpha1.Channel) bool { + if ch.Spec.Provisioner != nil && ch.Spec.Provisioner.Namespace == "" { + for _, v := range channelProvisioners { + if v == ch.Spec.Provisioner.Name { + return true + } + } } + return false +} - if err = mgr.Add(utils.NewBlockingStart(logger, cmw)); err != nil { - logger.Error("Unable to add the config map watcher", zap.Error(err)) - return err +func multiChannelFanoutConfig(channels []v1alpha1.Channel) *multichannelfanout.Config { + cc := make([]multichannelfanout.ChannelConfig, 0) + for _, c := range channels { + channelConfig := multichannelfanout.ChannelConfig{ + Namespace: c.Namespace, + Name: c.Name, + HostName: c.Status.Address.Hostname, + } + if c.Spec.Subscribable != nil { + channelConfig.FanoutConfig = fanout.Config{ + Subscriptions: c.Spec.Subscribable.Subscribers, + } + } + cc = append(cc, channelConfig) + } + return &multichannelfanout.Config{ + ChannelConfigs: cc, } - return nil } // runnableServer is a small wrapper around http.Server so that it matches the manager.Runnable diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml index e0191da4081..d1b30298273 100644 --- a/config/provisioners/in-memory-channel/in-memory-channel.yaml +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -62,7 +62,6 @@ rules: - apiGroups: - "" # Core API group. resources: - - configmaps - services verbs: - get @@ -83,24 +82,6 @@ rules: - services verbs: - update - - apiGroups: - - "" # Core API Group. - resources: - - configmaps - resourceNames: - - in-memory-channel-dispatcher-config-map - verbs: - - update - - apiGroups: - - networking.istio.io - resources: - - virtualservices - verbs: - - get - - list - - watch - - create - - update - apiGroups: - "" # Core API Group. resources: @@ -168,9 +149,10 @@ metadata: name: in-memory-channel-dispatcher rules: - apiGroups: - - "" # Core API group. + - "eventing.knative.dev" resources: - - configmaps + - "channels" + - "channels/status" verbs: - get - list @@ -216,24 +198,10 @@ spec: image: github.com/knative/eventing/cmd/fanoutsidecar args: - --sidecar_port=8080 - - --config_map_noticer=watcher - - --config_map_namespace=knative-eventing - - --config_map_name=in-memory-channel-dispatcher-config-map + - --channel_provisioner=in-memory + - --channel_provisioner=in-memory-channel env: - name: SYSTEM_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - ---- - -# Create the ConfigMap, because if we don't the dispatcher will flap when it first comes online and -# this can cause the integration tests to fail. - -apiVersion: v1 -kind: ConfigMap -metadata: - name: in-memory-channel-dispatcher-config-map - namespace: knative-eventing -data: - multiChannelFanoutConfig: '{}' diff --git a/contrib/kafka/cmd/controller/main.go b/contrib/kafka/cmd/controller/main.go index 37e45a43349..375361f4af3 100644 --- a/contrib/kafka/cmd/controller/main.go +++ b/contrib/kafka/cmd/controller/main.go @@ -4,20 +4,20 @@ import ( "flag" "os" + provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller" + "github.com/knative/eventing/contrib/kafka/pkg/controller/channel" + eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" - _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" - - provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller" - "github.com/knative/eventing/contrib/kafka/pkg/controller/channel" - eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/provisioners" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) // SchemeFunc adds types to a Scheme. diff --git a/contrib/kafka/main.go b/contrib/kafka/main.go index 316f2dbd521..ed98481c20b 100644 --- a/contrib/kafka/main.go +++ b/contrib/kafka/main.go @@ -4,20 +4,20 @@ import ( "flag" "os" + provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller" + "github.com/knative/eventing/contrib/kafka/pkg/controller/channel" + eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" - _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" - - provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller" - "github.com/knative/eventing/contrib/kafka/pkg/controller/channel" - eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/provisioners" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) const ( diff --git a/contrib/natss/pkg/controller/clusterchannelprovisioner/reconcile_test.go b/contrib/natss/pkg/controller/clusterchannelprovisioner/reconcile_test.go index e2f043d9231..57a70ade635 100644 --- a/contrib/natss/pkg/controller/clusterchannelprovisioner/reconcile_test.go +++ b/contrib/natss/pkg/controller/clusterchannelprovisioner/reconcile_test.go @@ -254,7 +254,7 @@ func makeK8sService() *corev1.Service { Selector: provisioners.DispatcherLabels(Name), Ports: []corev1.ServicePort{ { - Name: "http", + Protocol: corev1.ProtocolTCP, Port: 80, TargetPort: intstr.FromInt(8080), }, diff --git a/pkg/channelwatcher/channel_watcher.go b/pkg/channelwatcher/channel_watcher.go new file mode 100644 index 00000000000..b9a77670ce8 --- /dev/null +++ b/pkg/channelwatcher/channel_watcher.go @@ -0,0 +1,59 @@ +package channelwatcher + +import ( + "context" + + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/logging" + "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type WatchHandlerFunc func(context.Context, client.Client, types.NamespacedName) error + +type reconciler struct { + client client.Client + logger *zap.Logger + handler WatchHandlerFunc +} + +func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + ctx := logging.WithLogger(context.TODO(), r.logger.With(zap.Any("request", req))) + logging.FromContext(ctx).Info("New update for channel.") + if err := r.handler(ctx, r.client, req.NamespacedName); err != nil { + logging.FromContext(ctx).Error("WatchHandlerFunc returned error", zap.Error(err)) + return reconcile.Result{}, err + } + return reconcile.Result{}, nil +} + +func New(mgr manager.Manager, logger *zap.Logger, watchHandler WatchHandlerFunc) error { + c, err := controller.New("ChannelWatcher", mgr, controller.Options{ + Reconciler: &reconciler{ + client: mgr.GetClient(), + logger: logger, + handler: watchHandler, + }, + }) + if err != nil { + logger.Error("Unable to create controller for channelwatcher.", zap.Error(err)) + return err + } + + // Watch Channels. + err = c.Watch(&source.Kind{ + Type: &v1alpha1.Channel{}, + }, &handler.EnqueueRequestForObject{}) + if err != nil { + logger.Error("Unable to watch Channels.", zap.Error(err), zap.Any("type", &v1alpha1.Channel{})) + return err + } + return nil +} diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index a4261fe8a1e..497dae5adc8 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -41,9 +41,14 @@ const ( // AddFinalizerResult is used indicate whether a finalizer was added or already present. type AddFinalizerResult bool +// RemoveFinalizerResult is used to indicate whether a finalizer was found and removed (FinalizerRemoved), or finalizer not found (FinalizerNotFound). +type RemoveFinalizerResult bool + const ( - FinalizerAlreadyPresent AddFinalizerResult = false - FinalizerAdded AddFinalizerResult = true + FinalizerAlreadyPresent AddFinalizerResult = false + FinalizerAdded AddFinalizerResult = true + FinalizerRemoved RemoveFinalizerResult = true + FinalizerNotFound RemoveFinalizerResult = false ) // AddFinalizer adds finalizerName to the Object. @@ -57,25 +62,47 @@ func AddFinalizer(o metav1.Object, finalizerName string) AddFinalizerResult { return FinalizerAdded } -func RemoveFinalizer(o metav1.Object, finalizerName string) { +func RemoveFinalizer(o metav1.Object, finalizerName string) RemoveFinalizerResult { + result := FinalizerNotFound finalizers := sets.NewString(o.GetFinalizers()...) - finalizers.Delete(finalizerName) - o.SetFinalizers(finalizers.List()) + if finalizers.Has(finalizerName) { + result = FinalizerRemoved + finalizers.Delete(finalizerName) + o.SetFinalizers(finalizers.List()) + } + return result +} + +// K8sServiceOption is a functional option that can modify the K8s Service in CreateK8sService +type K8sServiceOption func(*corev1.Service) error + +// ExternalService is a functional option for CreateK8sService to create a K8s service of type ExternalName. +func ExternalService(c *eventingv1alpha1.Channel) K8sServiceOption { + return func(svc *corev1.Service) error { + svc.Spec = corev1.ServiceSpec{ + Type: corev1.ServiceTypeExternalName, + ExternalName: names.ServiceHostName(channelDispatcherServiceName(c.Spec.Provisioner.Name), system.Namespace()), + } + return nil + } } -func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { +func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel, opts ...K8sServiceOption) (*corev1.Service, error) { getSvc := func() (*corev1.Service, error) { return getK8sService(ctx, client, c) } - return createK8sService(ctx, client, getSvc, newK8sService(c)) + svc, err := newK8sService(c, opts...) + if err != nil { + return nil, err + } + return createK8sService(ctx, client, getSvc, svc) } func getK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { list := &corev1.ServiceList{} opts := &runtimeClient.ListOptions{ - Namespace: c.Namespace, - // TODO After the full release start selecting on new set of labels by using k8sServiceLabels(c) - LabelSelector: labels.SelectorFromSet(k8sOldServiceLabels(c)), + Namespace: c.Namespace, + LabelSelector: labels.SelectorFromSet(k8sServiceLabels(c)), // Set Raw because if we need to get more than one page, then we will put the continue token // into opts.Raw.Continue. Raw: &metav1.ListOptions{}, @@ -107,12 +134,17 @@ func createK8sService(ctx context.Context, client runtimeClient.Client, getSvc g } else if err != nil { return nil, err } - // spec.clusterIP is immutable and is set on existing services. If we don't set this // to the same value, we will encounter an error while updating. - svc.Spec.ClusterIP = current.Spec.ClusterIP + if svc.Spec.Type != corev1.ServiceTypeExternalName { + svc.Spec.ClusterIP = current.Spec.ClusterIP + } if !equality.Semantic.DeepDerivative(svc.Spec, current.Spec) || - !expectedLabelsPresent(current.ObjectMeta.Labels, svc.ObjectMeta.Labels) { + !expectedLabelsPresent(current.ObjectMeta.Labels, svc.ObjectMeta.Labels) || + // This DeepEqual is necessary to force update dispatcher services when upgrading from 0.5 to 0.6. + // Above DeepDerivative will not work because we have removed an optional field (name) from ports + // TODO: Remove this check in 0.7+ + !equality.Semantic.DeepEqual(svc.Spec.Ports, current.Spec.Ports) { current.Spec = svc.Spec current.ObjectMeta.Labels = addExpectedLabels(current.ObjectMeta.Labels, svc.ObjectMeta.Labels) err = client.Update(ctx, current) @@ -248,8 +280,9 @@ func UpdateChannel(ctx context.Context, client runtimeClient.Client, u *eventing // newK8sService creates a new Service for a Channel resource. It also sets the appropriate // OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it. // As well as being garbage collected when the Channel is deleted. -func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { - return &corev1.Service{ +func newK8sService(c *eventingv1alpha1.Channel, opts ...K8sServiceOption) (*corev1.Service, error) { + // Add annotations + svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ GenerateName: channelServiceName(c.ObjectMeta.Name), Namespace: c.Namespace, @@ -265,12 +298,19 @@ func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: PortName, - Port: PortNumber, + Name: PortName, + Protocol: corev1.ProtocolTCP, + Port: PortNumber, }, }, }, } + for _, opt := range opts { + if err := opt(svc); err != nil { + return nil, err + } + } + return svc, nil } // k8sOldServiceLabels returns a map with only old eventing channel and provisioner labels diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go index 6aded3735d3..848dda5d8f3 100644 --- a/pkg/provisioners/channel_util_test.go +++ b/pkg/provisioners/channel_util_test.go @@ -404,6 +404,35 @@ func TestAddFinalizer(t *testing.T) { } } +func TestRemoveFinalizer(t *testing.T) { + testCases := map[string]struct { + expected RemoveFinalizerResult + }{ + "Finalizer not found": { + expected: false, + }, + "Finalizer removed successfully": { + expected: true, + }, + } + finalizer := "test-finalizer" + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + c := getNewChannel() + if tc.expected { + c.Finalizers = []string{finalizer} + } else { + c.Finalizers = []string{} + } + actual := RemoveFinalizer(c, finalizer) + + if diff := cmp.Diff(actual, tc.expected); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + }) + } +} + func TestChannelNames(t *testing.T) { testCases := []struct { Name string @@ -597,8 +626,9 @@ func makeK8sService() *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: PortName, - Port: PortNumber, + Name: PortName, + Port: PortNumber, + Protocol: corev1.ProtocolTCP, }, }, }, diff --git a/pkg/provisioners/inmemory/channel/controller.go b/pkg/provisioners/inmemory/channel/controller.go index 7ff6128759a..88f0e96233f 100644 --- a/pkg/provisioners/inmemory/channel/controller.go +++ b/pkg/provisioners/inmemory/channel/controller.go @@ -18,11 +18,8 @@ package channel import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" - "github.com/knative/pkg/system" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -33,18 +30,6 @@ const ( // controllerAgentName is the string used by this controller to identify // itself when creating events. controllerAgentName = "in-memory-channel-controller" - - // ConfigMapName is the name of the ConfigMap in the knative-eventing namespace that contains - // the subscription information for all in-memory Channels. The Provisioner writes to it and the - // Dispatcher reads from it. - ConfigMapName = "in-memory-channel-dispatcher-config-map" -) - -var ( - defaultConfigMapKey = types.NamespacedName{ - Namespace: system.Namespace(), - Name: ConfigMapName, - } ) // ProvideController returns a Controller that represents the in-memory-channel Provisioner. @@ -52,9 +37,8 @@ func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Cont // Setup a new controller to Reconcile Channels that belong to this Cluster Provisioner // (in-memory channels). r := &reconciler{ - configMapKey: defaultConfigMapKey, - recorder: mgr.GetRecorder(controllerAgentName), - logger: logger, + recorder: mgr.GetRecorder(controllerAgentName), + logger: logger, } c, err := controller.New(controllerAgentName, mgr, controller.Options{ Reconciler: r, @@ -82,14 +66,5 @@ func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Cont return nil, err } - // Watch the VirtualServices that are owned by Channels. - err = c.Watch(&source.Kind{ - Type: &istiov1alpha3.VirtualService{}, - }, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true}) - if err != nil { - logger.Error("Unable to watch VirtualServices.", zap.Error(err)) - return nil, err - } - return c, nil } diff --git a/pkg/provisioners/inmemory/channel/reconcile.go b/pkg/provisioners/inmemory/channel/reconcile.go index 53ed753244e..5d5f8392a82 100644 --- a/pkg/provisioners/inmemory/channel/reconcile.go +++ b/pkg/provisioners/inmemory/channel/reconcile.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" @@ -32,19 +31,14 @@ import ( util "github.com/knative/eventing/pkg/provisioners" ccpcontroller "github.com/knative/eventing/pkg/provisioners/inmemory/clusterchannelprovisioner" "github.com/knative/eventing/pkg/reconciler/names" - "github.com/knative/eventing/pkg/sidecar/configmap" - "github.com/knative/eventing/pkg/sidecar/fanout" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" ) const ( finalizerName = controllerAgentName // Name of the corev1.Events emitted from the reconciliation process - channelReconciled = "ChannelReconciled" - channelUpdateStatusFailed = "ChannelUpdateStatusFailed" - channelConfigSyncFailed = "ChannelConfigSyncFailed" - k8sServiceCreateFailed = "K8sServiceCreateFailed" - virtualServiceCreateFailed = "VirtualServiceCreateFailed" + channelReconciled = "ChannelReconciled" + channelUpdateStatusFailed = "ChannelUpdateStatusFailed" + k8sServiceCreateFailed = "K8sServiceCreateFailed" // TODO after in-memory-channel is retired, asyncProvisionerName should be removed defaultProvisionerName = "in-memory-channel" ) @@ -53,8 +47,6 @@ type reconciler struct { client client.Client recorder record.EventRecorder logger *zap.Logger - - configMapKey client.ObjectKey } // Verify the struct implements reconcile.Reconciler @@ -93,8 +85,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err } logger.Info("Reconciling Channel") - // Modify a copy, not the original. - c = c.DeepCopy() + // Finalizer needs to be removed (even though no finalizers are added) to maintain backwards compatibility + // with v0.5 in which a finalzier was added. Or else channels will not get deleted after upgrading to 0.6+ + if result := util.RemoveFinalizer(c, finalizerName); result == util.FinalizerRemoved { + r.client.Update(ctx, c) + logger.Info("Channel reconciled. Finalizer Removed") + r.recorder.Eventf(c, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %q. Finalizer removed.", c.Name) + return reconcile.Result{Requeue: true}, nil + } err = r.reconcile(ctx, c) if err != nil { @@ -106,7 +104,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err r.recorder.Eventf(c, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %q", c.Name) } - if updateStatusErr := util.UpdateChannel(ctx, r.client, c); updateStatusErr != nil { + if updateStatusErr := r.client.Status().Update(ctx, c); updateStatusErr != nil { logger.Info("Error updating Channel Status", zap.Error(updateStatusErr)) r.recorder.Eventf(c, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err) return reconcile.Result{}, updateStatusErr @@ -128,135 +126,20 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) c.Status.InitializeConditions() - // We are syncing three things: - // 1. The K8s Service to talk to this Channel. - // 2. The Istio VirtualService to talk to this Channel. - // 3. The configuration of all Channel subscriptions. - - // We always need to sync the Channel config, so do it first. - if err := r.syncChannelConfig(ctx); err != nil { - logger.Info("Error syncing the Channel config", zap.Error(err)) - r.recorder.Eventf(c, corev1.EventTypeWarning, channelConfigSyncFailed, "Failed to sync Channel config: %v", err) - return err - } - - if c.DeletionTimestamp != nil { - // K8s garbage collection will delete the K8s service and VirtualService for this channel. - // We use a finalizer to ensure the channel config has been synced. - util.RemoveFinalizer(c, finalizerName) - return nil - } - - util.AddFinalizer(c, finalizerName) - - svc, err := util.CreateK8sService(ctx, r.client, c) + // We are syncing K8s Service to talk to this Channel. + svc, err := util.CreateK8sService(ctx, r.client, c, util.ExternalService(c)) if err != nil { logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) r.recorder.Eventf(c, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err) return err } - c.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace)) - if c.Spec.Provisioner.Name == defaultProvisionerName { - _, err = util.CreateVirtualService(ctx, r.client, c, svc) - if err != nil { - logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) - r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) - return err - } - } else { - // We need to have a single dispatcher that is pointed at by _both_ - // ClusterChannelProvisioners. So fake the channel, by saying that its provisioner is the - // one with the single dispatcher. The faked provisioner is used only to determine the - // dispatcher Service's name. - cCopy := c.DeepCopy() - cCopy.Spec.Provisioner.Name = defaultProvisionerName - _, err = util.CreateVirtualService(ctx, r.client, cCopy, svc) - if err != nil { - logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) - r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) - return err - } - } + c.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace)) c.Status.MarkProvisioned() return nil } -func (r *reconciler) syncChannelConfig(ctx context.Context) error { - channels, err := r.listAllChannels(ctx) - if err != nil { - r.logger.Info("Unable to list channels", zap.Error(err)) - return err - } - config := multiChannelFanoutConfig(channels) - return r.writeConfigMap(ctx, config) -} - -func (r *reconciler) writeConfigMap(ctx context.Context, config *multichannelfanout.Config) error { - logger := r.logger.With(zap.Any("configMap", r.configMapKey)) - - updated, err := configmap.SerializeConfig(*config) - if err != nil { - r.logger.Error("Unable to serialize config", zap.Error(err), zap.Any("config", config)) - return err - } - - cm := &corev1.ConfigMap{} - err = r.client.Get(ctx, r.configMapKey, cm) - if errors.IsNotFound(err) { - cm = r.createNewConfigMap(updated) - err = r.client.Create(ctx, cm) - } - if err != nil { - logger.Info("Unable to get/create ConfigMap", zap.Error(err)) - return err - } - - if equality.Semantic.DeepEqual(cm.Data, updated) { - // Nothing to update. - return nil - } - - cm.Data = updated - return r.client.Update(ctx, cm) -} - -func (r *reconciler) createNewConfigMap(data map[string]string) *corev1.ConfigMap { - return &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: r.configMapKey.Namespace, - Name: r.configMapKey.Name, - }, - Data: data, - } -} - -func multiChannelFanoutConfig(channels []eventingv1alpha1.Channel) *multichannelfanout.Config { - cc := make([]multichannelfanout.ChannelConfig, 0) - for _, c := range channels { - channelConfig := multichannelfanout.ChannelConfig{ - Namespace: c.Namespace, - Name: c.Name, - } - if c.Spec.Subscribable != nil { - // TODO After in-memory-channel is retired, this logic must be refactored. - asyncHandler := false - if c.Spec.Provisioner.Name != defaultProvisionerName { - asyncHandler = true - } - channelConfig.FanoutConfig = fanout.Config{ - Subscriptions: c.Spec.Subscribable.Subscribers, - AsyncHandler: asyncHandler, - } - } - cc = append(cc, channelConfig) - } - return &multichannelfanout.Config{ - ChannelConfigs: cc, - } -} - func (r *reconciler) listAllChannels(ctx context.Context) ([]eventingv1alpha1.Channel, error) { channels := make([]eventingv1alpha1.Channel, 0) diff --git a/pkg/provisioners/inmemory/channel/reconcile_test.go b/pkg/provisioners/inmemory/channel/reconcile_test.go index 30b9b2ac27b..0b221854a41 100644 --- a/pkg/provisioners/inmemory/channel/reconcile_test.go +++ b/pkg/provisioners/inmemory/channel/reconcile_test.go @@ -18,27 +18,27 @@ package channel import ( "context" - "encoding/json" "errors" "fmt" "testing" - "github.com/google/go-cmp/cmp" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/reconciler/names" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" - "github.com/knative/eventing/pkg/sidecar/configmap" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" "github.com/knative/eventing/pkg/utils" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "github.com/knative/pkg/system" _ "github.com/knative/pkg/system/testing" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -56,8 +56,6 @@ const ( cmName = "test-config-map" testErrorMessage = "test induced error" - - insertedByVerifyConfigMapData = "data inserted by verifyConfigMapData so that it can be WantPresent" ) var ( @@ -183,11 +181,9 @@ var ( // map of events to set test cases' expectations easier events = map[string]corev1.Event{ - channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal}, - channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning}, - channelConfigSyncFailed: {Reason: channelConfigSyncFailed, Type: corev1.EventTypeWarning}, - k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning}, - virtualServiceCreateFailed: {Reason: virtualServiceCreateFailed, Type: corev1.EventTypeWarning}, + channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal}, + channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning}, + k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning}, } ) @@ -251,98 +247,28 @@ func TestReconcile(t *testing.T) { }, }, { - Name: "Channel deleted - Channel config sync fails", - InitialState: []runtime.Object{ - makeDeletingChannel(), - }, - Mocks: controllertesting.Mocks{ - MockLists: errorListingChannels(), - }, - WantPresent: []runtime.Object{ - // Finalizer has not been removed. - makeDeletingChannel(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[channelConfigSyncFailed], - }, - }, - { - Name: "Channel deleted - finalizer removed", + Name: "Channel has finalizer (to test back compat with version <= 0.5, when finalizers were added", InitialState: []runtime.Object{ - makeDeletingChannel(), + makeChannelWithFinalizer(), }, WantPresent: []runtime.Object{ - makeDeletingChannelWithoutFinalizer(), - }, - WantEvent: []corev1.Event{ - events[channelReconciled], - }, - }, - { - Name: "Channel config sync fails - can't list Channels", - InitialState: []runtime.Object{ - makeChannel(), - }, - Mocks: controllertesting.Mocks{ - MockLists: errorListingChannels(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[channelConfigSyncFailed], - }, - }, - { - Name: "Channel config sync fails - can't get ConfigMap", - InitialState: []runtime.Object{ makeChannel(), }, - Mocks: controllertesting.Mocks{ - MockGets: errorGettingConfigMap(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[channelConfigSyncFailed], - }, - }, - { - Name: "Channel config sync fails - can't create ConfigMap", - InitialState: []runtime.Object{ - makeChannel(), - }, - Mocks: controllertesting.Mocks{ - MockCreates: errorCreatingConfigMap(), - }, - WantErrMsg: testErrorMessage, WantEvent: []corev1.Event{ - events[channelConfigSyncFailed], - }, - }, - { - Name: "Channel config sync fails - can't update ConfigMap", - InitialState: []runtime.Object{ - makeChannel(), - makeConfigMap(), - }, - Mocks: controllertesting.Mocks{ - MockUpdates: errorUpdatingConfigMap(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[channelConfigSyncFailed], + events[channelReconciled], }, + WantResult: reconcile.Result{Requeue: true}, }, { Name: "K8s service get fails", InitialState: []runtime.Object{ makeChannel(), - makeConfigMap(), }, Mocks: controllertesting.Mocks{ MockLists: errorListingK8sService(), }, WantPresent: []runtime.Object{ - makeChannelWithFinalizer(), + makeChannel(), }, WantErrMsg: testErrorMessage, WantEvent: []corev1.Event{ @@ -353,99 +279,24 @@ func TestReconcile(t *testing.T) { Name: "K8s service creation fails", InitialState: []runtime.Object{ makeChannel(), - makeConfigMap(), }, Mocks: controllertesting.Mocks{ MockCreates: errorCreatingK8sService(), }, WantPresent: []runtime.Object{ // TODO: This should have a useful error message saying that the K8s Service failed. - makeChannelWithFinalizer(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[k8sServiceCreateFailed], - }, - }, - { - Name: "Virtual service get fails", - InitialState: []runtime.Object{ makeChannel(), - makeConfigMap(), - makeK8sService(), - makeVirtualService(), - }, - Mocks: controllertesting.Mocks{ - MockLists: errorListingVirtualService(), - }, - WantPresent: []runtime.Object{ - // TODO: This should have a useful error message saying that the VirtualService - // failed. - makeChannelWithFinalizerAndAddress(), }, WantErrMsg: testErrorMessage, WantEvent: []corev1.Event{ - events[virtualServiceCreateFailed], - }, - }, - { - Name: "Virtual service creation fails", - InitialState: []runtime.Object{ - makeChannel(), - makeConfigMap(), - makeK8sService(), - }, - Mocks: controllertesting.Mocks{ - MockCreates: errorCreatingVirtualService(), - }, - WantPresent: []runtime.Object{ - // TODO: This should have a useful error message saying that the VirtualService - // failed. - makeChannelWithFinalizerAndAddress(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[virtualServiceCreateFailed], - }, - }, - { - Name: "Channel get for update fails", - InitialState: []runtime.Object{ - makeChannel(), - makeConfigMap(), - makeK8sService(), - makeVirtualService(), - }, - Mocks: controllertesting.Mocks{ - MockGets: errorOnSecondChannelGet(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[channelReconciled], events[channelUpdateStatusFailed], + events[k8sServiceCreateFailed], }, }, { - Name: "Channel update fails", - InitialState: []runtime.Object{ - makeChannel(), - makeConfigMap(), - makeK8sService(), - makeVirtualService(), - }, - Mocks: controllertesting.Mocks{ - MockUpdates: errorUpdatingChannel(), - }, - WantErrMsg: testErrorMessage, - WantEvent: []corev1.Event{ - events[channelReconciled], events[channelUpdateStatusFailed], - }, - }, { Name: "Channel status update fails", InitialState: []runtime.Object{ makeChannel(), - makeConfigMap(), makeK8sService(), - makeVirtualService(), }, Mocks: controllertesting.Mocks{ MockStatusUpdates: errorUpdatingChannelStatus(), @@ -454,83 +305,14 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{ events[channelReconciled], events[channelUpdateStatusFailed], }, - }, { - Name: "Channel reconcile successful - Channel list follows pagination", - InitialState: []runtime.Object{ - makeChannel(), - makeConfigMap(), - }, - Mocks: controllertesting.Mocks{ - MockLists: (&paginatedChannelsListStruct{channels: channels}).MockLists(), - // This is more accurate to be in WantPresent, but we need to check JSON equality, - // not string equality, so it can't be done in WantPresent. Instead, we verify - // during the update call, swapping out the data and WantPresent with that inserted - // data. - MockUpdates: verifyConfigMapData(channelsConfig), - }, - WantPresent: []runtime.Object{ - makeReadyChannel(), - makeK8sService(), - makeVirtualService(), - makeConfigMapWithVerifyConfigMapData(), - }, - WantEvent: []corev1.Event{ - events[channelReconciled], - }, - }, - { - Name: "Channel reconcile successful - Channel has no subscribers", - InitialState: []runtime.Object{ - makeChannel(), - makeConfigMap(), - }, - Mocks: controllertesting.Mocks{ - MockLists: (&paginatedChannelsListStruct{channels: []eventingv1alpha1.Channel{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "high-consul", - Name: "duarte", - }, - Spec: eventingv1alpha1.ChannelSpec{ - Provisioner: &corev1.ObjectReference{ - Name: ccpName, - }, - }, - }, - }}).MockLists(), - // This is more accurate to be in WantPresent, but we need to check JSON equality, - // not string equality, so it can't be done in WantPresent. Instead, we verify - // during the update call, swapping out the data and WantPresent with that inserted - // data. - MockUpdates: verifyConfigMapData(multichannelfanout.Config{ - ChannelConfigs: []multichannelfanout.ChannelConfig{ - { - Namespace: "high-consul", - Name: "duarte", - }, - }, - }), - }, - WantPresent: []runtime.Object{ - makeReadyChannel(), - makeK8sService(), - makeVirtualService(), - makeConfigMapWithVerifyConfigMapData(), - }, - WantEvent: []corev1.Event{ - events[channelReconciled], - }, }, { Name: "Channel reconcile successful - Async channel", - // VirtualService should have channel provisioner name - // defaults to in-memory-channel but the service should match provisioner's service name InitialState: []runtime.Object{ makeChannel("in-memory"), }, Mocks: controllertesting.Mocks{}, WantPresent: []runtime.Object{ - makeVirtualService(), makeK8sService("in-memory"), }, WantEvent: []corev1.Event{ @@ -539,14 +321,11 @@ func TestReconcile(t *testing.T) { }, { Name: "Channel reconcile successful - Non Async channel", - // VirtualService should have channel provisioner name - // defaults to in-memory-channel InitialState: []runtime.Object{ makeChannel(), }, Mocks: controllertesting.Mocks{}, WantPresent: []runtime.Object{ - makeVirtualService(), makeK8sService(), }, WantEvent: []corev1.Event{ @@ -556,17 +335,12 @@ func TestReconcile(t *testing.T) { } for _, tc := range testCases { - configMapKey := types.NamespacedName{ - Namespace: cmNamespace, - Name: cmName, - } c := tc.GetClient() recorder := tc.GetEventRecorder() r := &reconciler{ - client: c, - recorder: recorder, - logger: zap.NewNop(), - configMapKey: configMapKey, + client: c, + recorder: recorder, + logger: zap.NewNop(), } if tc.ReconcileKey == "" { tc.ReconcileKey = fmt.Sprintf("/%s", cName) @@ -607,19 +381,6 @@ func getProvisionerName(pn []string) string { return provisionerName } -func makeChannelWithFinalizerAndAddress() *eventingv1alpha1.Channel { - c := makeChannelWithFinalizer() - c.Status.SetAddress(serviceAddress) - return c -} - -func makeReadyChannel() *eventingv1alpha1.Channel { - // Ready channels have the finalizer and are Addressable. - c := makeChannelWithFinalizerAndAddress() - c.Status.MarkProvisioned() - return c -} - func makeChannelNilProvisioner() *eventingv1alpha1.Channel { c := makeChannel() c.Spec.Provisioner = nil @@ -644,38 +405,6 @@ func makeChannelWithFinalizer() *eventingv1alpha1.Channel { return c } -func makeDeletingChannel() *eventingv1alpha1.Channel { - c := makeChannelWithFinalizer() - c.DeletionTimestamp = &deletionTime - return c -} - -func makeDeletingChannelWithoutFinalizer() *eventingv1alpha1.Channel { - c := makeDeletingChannel() - c.Finalizers = nil - return c -} - -func makeConfigMap() *corev1.ConfigMap { - return &corev1.ConfigMap{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "ConfigMap", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: cmNamespace, - Name: cmName, - }, - } -} - -func makeConfigMapWithVerifyConfigMapData() *corev1.ConfigMap { - cm := makeConfigMap() - cm.Data = map[string]string{} - cm.Data[configmap.MultiChannelFanoutConfigKey] = insertedByVerifyConfigMapData - return cm -} - func makeK8sService(pn ...string) *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ @@ -703,60 +432,8 @@ func makeK8sService(pn ...string) *corev1.Service { }, }, Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - { - Name: util.PortName, - Port: util.PortNumber, - }, - }, - }, - } -} - -func makeVirtualService() *istiov1alpha3.VirtualService { - return &istiov1alpha3.VirtualService{ - TypeMeta: metav1.TypeMeta{ - APIVersion: istiov1alpha3.SchemeGroupVersion.String(), - Kind: "VirtualService", - }, - ObjectMeta: metav1.ObjectMeta{ - GenerateName: fmt.Sprintf("%s-channel-", cName), - Namespace: cNamespace, - Labels: map[string]string{ - util.EventingChannelLabel: cName, - util.OldEventingChannelLabel: cName, - util.EventingProvisionerLabel: ccpName, - util.OldEventingProvisionerLabel: ccpName, - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), - Kind: "Channel", - Name: cName, - UID: cUID, - Controller: &truePointer, - BlockOwnerDeletion: &truePointer, - }, - }, - }, - Spec: istiov1alpha3.VirtualServiceSpec{ - Hosts: []string{ - serviceAddress, - fmt.Sprintf("%s.%s.channels.%s", cName, cNamespace, utils.GetClusterDomainName()), - }, - HTTP: []istiov1alpha3.HTTPRoute{{ - Rewrite: &istiov1alpha3.HTTPRewrite{ - Authority: fmt.Sprintf("%s.%s.channels.%s", cName, cNamespace, utils.GetClusterDomainName()), - }, - Route: []istiov1alpha3.DestinationWeight{{ - Destination: istiov1alpha3.Destination{ - Host: "in-memory-channel-dispatcher.knative-testing.svc." + utils.GetClusterDomainName(), - Port: istiov1alpha3.PortSelector{ - Number: util.PortNumber, - }, - }}, - }}, - }, + ExternalName: names.ServiceHostName(fmt.Sprintf("%s-dispatcher", getProvisionerName(pn)), system.Namespace()), + Type: corev1.ServiceTypeExternalName, }, } } @@ -780,18 +457,6 @@ func errorGettingChannel() []controllertesting.MockGet { }, } } - -func errorGettingConfigMap() []controllertesting.MockGet { - return []controllertesting.MockGet{ - func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*corev1.ConfigMap); ok { - return controllertesting.Handled, errors.New(testErrorMessage) - } - return controllertesting.Unhandled, nil - }, - } -} - func errorListingK8sService() []controllertesting.MockList { return []controllertesting.MockList{ func(_ client.Client, _ context.Context, _ *client.ListOptions, obj runtime.Object) (controllertesting.MockHandled, error) { @@ -803,17 +468,6 @@ func errorListingK8sService() []controllertesting.MockList { } } -func errorListingVirtualService() []controllertesting.MockList { - return []controllertesting.MockList{ - func(_ client.Client, _ context.Context, _ *client.ListOptions, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*istiov1alpha3.VirtualServiceList); ok { - return controllertesting.Handled, errors.New(testErrorMessage) - } - return controllertesting.Unhandled, nil - }, - } -} - func errorListingChannels() []controllertesting.MockList { return []controllertesting.MockList{ func(client.Client, context.Context, *client.ListOptions, runtime.Object) (controllertesting.MockHandled, error) { @@ -822,17 +476,6 @@ func errorListingChannels() []controllertesting.MockList { } } -func errorCreatingConfigMap() []controllertesting.MockCreate { - return []controllertesting.MockCreate{ - func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*corev1.ConfigMap); ok { - return controllertesting.Handled, errors.New(testErrorMessage) - } - return controllertesting.Unhandled, nil - }, - } -} - func errorCreatingK8sService() []controllertesting.MockCreate { return []controllertesting.MockCreate{ func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { @@ -844,17 +487,6 @@ func errorCreatingK8sService() []controllertesting.MockCreate { } } -func errorCreatingVirtualService() []controllertesting.MockCreate { - return []controllertesting.MockCreate{ - func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*istiov1alpha3.VirtualService); ok { - return controllertesting.Handled, errors.New(testErrorMessage) - } - return controllertesting.Unhandled, nil - }, - } -} - func errorUpdatingChannel() []controllertesting.MockUpdate { return []controllertesting.MockUpdate{ func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { @@ -877,17 +509,6 @@ func errorUpdatingChannelStatus() []controllertesting.MockStatusUpdate { } } -func errorUpdatingConfigMap() []controllertesting.MockUpdate { - return []controllertesting.MockUpdate{ - func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*corev1.ConfigMap); ok { - return controllertesting.Handled, errors.New(testErrorMessage) - } - return controllertesting.Unhandled, nil - }, - } -} - type paginatedChannelsListStruct struct { channels []eventingv1alpha1.Channel } @@ -911,28 +532,3 @@ func (p *paginatedChannelsListStruct) MockLists() []controllertesting.MockList { }, } } - -func verifyConfigMapData(expected multichannelfanout.Config) []controllertesting.MockUpdate { - return []controllertesting.MockUpdate{ - func(innerClient client.Client, ctx context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if cm, ok := obj.(*corev1.ConfigMap); ok { - s := cm.Data[configmap.MultiChannelFanoutConfigKey] - c := multichannelfanout.Config{} - err := json.Unmarshal([]byte(s), &c) - if err != nil { - return controllertesting.Handled, - fmt.Errorf("test is unable to unmarshal ConfigMap data: %v", err) - } - if diff := cmp.Diff(c, expected); diff != "" { - return controllertesting.Handled, - fmt.Errorf("test got unwanted ChannelsConfig (-want +got) %s", diff) - } - // Verified it is correct, now so that we can verify this actually occurred, swap - // out the data with a known value for later comparison. - cm.Data[configmap.MultiChannelFanoutConfigKey] = insertedByVerifyConfigMapData - return controllertesting.Handled, innerClient.Update(ctx, obj) - } - return controllertesting.Unhandled, nil - }, - } -} diff --git a/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go b/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go index 5e79fc3c802..678c544d46a 100644 --- a/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go +++ b/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -153,7 +154,7 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste return nil } - svc, err := util.CreateDispatcherService(ctx, r.client, ccp) + svc, err := util.CreateDispatcherService(ctx, r.client, ccp, setDispatcherServiceSelector()) if err != nil { logger.Info("Error creating the ClusterChannelProvisioner's K8s Service", zap.Error(err)) @@ -179,6 +180,15 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste return nil } +// Since there are two provisioners "in-memory" and "in-memory-channel" but one single dispatcher service deployment, +// update the label of the K8s service to always point at the same dispatcher service deployment +func setDispatcherServiceSelector() util.ServiceOption { + return func(svc *v1.Service) error { + svc.Spec.Selector = util.DispatcherLabels("in-memory-channel") + return nil + } +} + func (r *reconciler) deleteOldDispatcherService(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error { svcName := fmt.Sprintf("%s-clusterbus", ccp.Name) svcKey := types.NamespacedName{ diff --git a/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile_test.go b/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile_test.go index e4ff44abb9b..9d8934d0c61 100644 --- a/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile_test.go @@ -40,10 +40,11 @@ import ( ) const ( - ccpUID = "test-uid" - testErrorMessage = "test-induced-error" - testNS = "test-ns" - Name = "in-memory-channel" + ccpUID = "test-uid" + testErrorMessage = "test-induced-error" + testNS = "test-ns" + inMemoryChannelName = "in-memory-channel" + inMemoryName = "in-memory" ) var ( @@ -96,7 +97,7 @@ func TestIsControlled(t *testing.T) { "wrong namespace": { ref: &corev1.ObjectReference{ Namespace: "other", - Name: Name, + Name: inMemoryName, }, isControlled: false, }, @@ -108,7 +109,7 @@ func TestIsControlled(t *testing.T) { }, "is controlled": { ref: &corev1.ObjectReference{ - Name: Name, + Name: inMemoryName, }, isControlled: true, }, @@ -143,7 +144,7 @@ func TestReconcile(t *testing.T) { &eventingv1alpha1.ClusterChannelProvisioner{ ObjectMeta: metav1.ObjectMeta{ Namespace: "not empty string", - Name: Name, + Name: inMemoryName, }, }, }, @@ -240,6 +241,20 @@ func TestReconcile(t *testing.T) { events[ccpReconciled], }, }, + { + Name: "Create dispatcher succeeds - in-memory-Channel", + ReconcileKey: inMemoryChannelName, + InitialState: []runtime.Object{ + makeClusterChannelProvisionerOld(), + }, + WantPresent: []runtime.Object{ + makeReadyClusterChannelProvisionerOld(), + makeK8sServiceOld(), + }, + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, + }, { Name: "Create dispatcher succeeds - request is namespace-scoped", InitialState: []runtime.Object{ @@ -249,7 +264,7 @@ func TestReconcile(t *testing.T) { makeReadyClusterChannelProvisioner(), makeK8sService(), }, - ReconcileKey: fmt.Sprintf("%s/%s", testNS, Name), + ReconcileKey: fmt.Sprintf("%s/%s", testNS, inMemoryName), WantEvent: []corev1.Event{ events[ccpReconciled], }, @@ -297,13 +312,19 @@ func TestReconcile(t *testing.T) { logger: zap.NewNop(), } if tc.ReconcileKey == "" { - tc.ReconcileKey = fmt.Sprintf("/%s", Name) + tc.ReconcileKey = fmt.Sprintf("/%s", inMemoryName) } tc.IgnoreTimes = true t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } +func makeClusterChannelProvisionerOld() *eventingv1alpha1.ClusterChannelProvisioner { + ccp := makeClusterChannelProvisioner() + ccp.SetName(inMemoryChannelName) + return ccp +} + func makeClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvisioner { return &eventingv1alpha1.ClusterChannelProvisioner{ TypeMeta: metav1.TypeMeta{ @@ -311,7 +332,7 @@ func makeClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvisioner Kind: "ClusterChannelProvisioner", }, ObjectMeta: metav1.ObjectMeta{ - Name: Name, + Name: inMemoryName, UID: ccpUID, }, Spec: eventingv1alpha1.ClusterChannelProvisionerSpec{}, @@ -328,6 +349,12 @@ func makeReadyClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvis return ccp } +func makeReadyClusterChannelProvisionerOld() *eventingv1alpha1.ClusterChannelProvisioner { + ccp := makeReadyClusterChannelProvisioner() + ccp.Name = inMemoryChannelName + return ccp +} + func makeDeletingClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvisioner { ccp := makeClusterChannelProvisioner() ccp.DeletionTimestamp = &deletionTime @@ -342,35 +369,43 @@ func makeK8sService() *corev1.Service { }, ObjectMeta: metav1.ObjectMeta{ Namespace: system.Namespace(), - Name: fmt.Sprintf("%s-dispatcher", Name), + Name: fmt.Sprintf("%s-dispatcher", inMemoryName), OwnerReferences: []metav1.OwnerReference{ { APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), Kind: "ClusterChannelProvisioner", - Name: Name, + Name: inMemoryName, UID: ccpUID, Controller: &truePointer, BlockOwnerDeletion: &truePointer, }, }, - Labels: util.DispatcherLabels(Name), + Labels: util.DispatcherLabels(inMemoryName), }, Spec: corev1.ServiceSpec{ - Selector: util.DispatcherLabels(Name), + Selector: util.DispatcherLabels(inMemoryChannelName), Ports: []corev1.ServicePort{ { - Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), + Protocol: corev1.ProtocolTCP, }, }, }, } } +func makeK8sServiceOld() *corev1.Service { + svc := makeK8sService() + svc.SetName(fmt.Sprintf("%s-dispatcher", inMemoryChannelName)) + svc.GetOwnerReferences()[0].Name = inMemoryChannelName + svc.SetLabels(util.DispatcherLabels(inMemoryChannelName)) + return svc +} + func makeOldK8sService() *corev1.Service { svc := makeK8sService() - svc.ObjectMeta.Name = fmt.Sprintf("%s-clusterbus", Name) + svc.ObjectMeta.Name = fmt.Sprintf("%s-clusterbus", inMemoryName) return svc } diff --git a/pkg/provisioners/inmemory/controller/main.go b/pkg/provisioners/inmemory/controller/main.go index d8da2d062b4..2b09c992b4f 100644 --- a/pkg/provisioners/inmemory/controller/main.go +++ b/pkg/provisioners/inmemory/controller/main.go @@ -29,6 +29,8 @@ import ( "go.uber.org/zap" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) func main() { diff --git a/pkg/provisioners/provisioner_util.go b/pkg/provisioners/provisioner_util.go index 4afe9d4aea0..0a3653df75f 100644 --- a/pkg/provisioners/provisioner_util.go +++ b/pkg/provisioners/provisioner_util.go @@ -5,6 +5,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -19,7 +20,10 @@ import ( "github.com/knative/pkg/system" ) -func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, ccp *eventingv1alpha1.ClusterChannelProvisioner) (*corev1.Service, error) { +// ServiceOption can be used to optionally modify the K8s default that gets created for the Dispatcher in CreateDispatcherService +type ServiceOption func(*v1.Service) error + +func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, ccp *eventingv1alpha1.ClusterChannelProvisioner, opts ...ServiceOption) (*corev1.Service, error) { svcKey := types.NamespacedName{ Namespace: system.Namespace(), Name: channelDispatcherServiceName(ccp.Name), @@ -29,7 +33,12 @@ func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, c err := client.Get(ctx, svcKey, svc) return svc, err } - return createK8sService(ctx, client, getSvc, newDispatcherService(ccp)) + svc, err := newDispatcherService(ccp, opts...) + if err != nil { + return nil, err + } + + return createK8sService(ctx, client, getSvc, svc) } func UpdateClusterChannelProvisionerStatus(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.ClusterChannelProvisioner) error { @@ -50,9 +59,9 @@ func UpdateClusterChannelProvisionerStatus(ctx context.Context, client runtimeCl // newDispatcherService creates a new Service for a ClusterChannelProvisioner resource. It also sets // the appropriate OwnerReferences on the resource so handleObject can discover // the ClusterChannelProvisioner resource that 'owns' it. -func newDispatcherService(ccp *eventingv1alpha1.ClusterChannelProvisioner) *corev1.Service { +func newDispatcherService(ccp *eventingv1alpha1.ClusterChannelProvisioner, opts ...ServiceOption) (*corev1.Service, error) { labels := DispatcherLabels(ccp.Name) - return &corev1.Service{ + svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: channelDispatcherServiceName(ccp.Name), Namespace: system.Namespace(), @@ -69,13 +78,24 @@ func newDispatcherService(ccp *eventingv1alpha1.ClusterChannelProvisioner) *core Selector: labels, Ports: []corev1.ServicePort{ { - Name: "http", + // There is a bug in Istio where named port doesn't work when connecting using an ExternalName service + // Refer to https://github.com/istio/istio/issues/13193 for more details. + // TODO: Uncomment Name:"http" when ISTIO fixes the issue + // Name: "http", Port: 80, + Protocol: corev1.ProtocolTCP, TargetPort: intstr.FromInt(8080), }, }, }, } + + for _, opt := range opts { + if err := opt(svc); err != nil { + return nil, err + } + } + return svc, nil } func DispatcherLabels(ccpName string) map[string]string { diff --git a/pkg/provisioners/provisioner_util_test.go b/pkg/provisioners/provisioner_util_test.go index cdf2eb724e6..fcd6a9dafe1 100644 --- a/pkg/provisioners/provisioner_util_test.go +++ b/pkg/provisioners/provisioner_util_test.go @@ -180,9 +180,9 @@ func makeDispatcherService() *corev1.Service { Selector: DispatcherLabels(clusterChannelProvisionerName), Ports: []corev1.ServicePort{ { - Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), + Protocol: corev1.ProtocolTCP, }, }, }, diff --git a/pkg/reconciler/v1alpha1/broker/resources/ingress.go b/pkg/reconciler/v1alpha1/broker/resources/ingress.go index f83a991a7b1..3bde11755e9 100644 --- a/pkg/reconciler/v1alpha1/broker/resources/ingress.go +++ b/pkg/reconciler/v1alpha1/broker/resources/ingress.go @@ -58,6 +58,8 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: ingressLabels(args.Broker), + // TODO: Remove this annotation once all channels stop using istio virtual service + // https://github.com/knative/eventing/issues/294 Annotations: map[string]string{ "sidecar.istio.io/inject": "true", }, diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go index 8872b7026ac..282a1c0985d 100644 --- a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go @@ -30,7 +30,6 @@ import ( "net/http" "github.com/google/go-cmp/cmp" - "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/sidecar/fanout" "go.uber.org/zap" ) @@ -45,27 +44,19 @@ type Config struct { type ChannelConfig struct { Namespace string `json:"namespace"` Name string `json:"name"` + HostName string `json:"hostname"` FanoutConfig fanout.Config `json:"fanoutConfig"` } -// MakeChannelKey creates the key used for this Channel in the Handler's handlers map. -func makeChannelKey(namespace, name string) string { - return fmt.Sprintf("%s/%s", namespace, name) -} - // makeChannelKeyFromConfig creates the channel key for a given channelConfig. It is a helper around // MakeChannelKey. func makeChannelKeyFromConfig(config ChannelConfig) string { - return makeChannelKey(config.Namespace, config.Name) + return config.HostName } // getChannelKey extracts the channel key from the given HTTP request. -func getChannelKey(r *http.Request) (string, error) { - cr, err := provisioners.ParseChannel(r.Host) - if err != nil { - return "", err - } - return makeChannelKey(cr.Namespace, cr.Name), nil +func getChannelKey(r *http.Request) string { + return r.Host } // Handler is an http.Handler that introspects the incoming request to determine what Channel it is @@ -114,12 +105,7 @@ func (h *Handler) CopyWithNewConfig(conf Config) (*Handler, error) { // ServeHTTP delegates the actual handling of the request to a fanout.Handler, based on the // request's channel key. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - channelKey, err := getChannelKey(r) - if err != nil { - h.logger.Error("Unable to extract channelKey", zap.Error(err)) - w.WriteHeader(http.StatusInternalServerError) - return - } + channelKey := getChannelKey(r) fh, ok := h.handlers[channelKey] if !ok { h.logger.Error("Unable to find a handler for request", zap.String("channelKey", channelKey)) diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go index e6c9c30d048..32b86bdc84a 100644 --- a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go @@ -34,33 +34,6 @@ const ( replaceDomain = "replaceDomain" ) -func TestMakeChannelKey(t *testing.T) { - testCases := []struct { - namespace string - name string - key string - }{ - { - namespace: "default", - name: "channel", - key: "default/channel", - }, - { - namespace: "foo", - name: "bar", - key: "foo/bar", - }, - } - for _, tc := range testCases { - name := fmt.Sprintf("%s, %s -> %s", tc.namespace, tc.name, tc.key) - t.Run(name, func(t *testing.T) { - if key := makeChannelKey(tc.namespace, tc.name); key != tc.key { - t.Errorf("Unexpected ChannelKey. Expected '%v'. Actual '%v'", tc.key, key) - } - }) - } -} - func TestNewHandler(t *testing.T) { testCases := []struct { name string @@ -72,16 +45,14 @@ func TestNewHandler(t *testing.T) { config: Config{ ChannelConfigs: []ChannelConfig{ { - Namespace: "default", - Name: "duplicate", + HostName: "duplicatekey", }, { - Namespace: "default", - Name: "duplicate", + HostName: "duplicatekey", }, }, }, - createErr: "duplicate channel key: default/duplicate", + createErr: "duplicate channel key: duplicatekey", }, } @@ -241,8 +212,9 @@ func TestServeHTTP(t *testing.T) { config: Config{ ChannelConfigs: []ChannelConfig{ { - Namespace: "default", - Name: "first-channel", + Namespace: "ns", + Name: "name", + HostName: "first-channel.default", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -261,8 +233,10 @@ func TestServeHTTP(t *testing.T) { config: Config{ ChannelConfigs: []ChannelConfig{ { - Namespace: "default", - Name: "first-channel", + + Namespace: "ns", + Name: "name", + HostName: "first-channel.default", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -274,6 +248,7 @@ func TestServeHTTP(t *testing.T) { { Namespace: "default", Name: "second-channel", + HostName: "second-channel.default", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -303,7 +278,7 @@ func TestServeHTTP(t *testing.T) { h, err := NewHandler(zap.NewNop(), tc.config) if err != nil { - t.Errorf("Unexpected NewHandler error: '%v'", err) + t.Fatalf("Unexpected NewHandler error: '%v'", err) } r := requestWithChannelKey(tc.key) diff --git a/pkg/sidecar/swappable/swappable_test.go b/pkg/sidecar/swappable/swappable_test.go index 7ee97d00955..b4cc0daa872 100644 --- a/pkg/sidecar/swappable/swappable_test.go +++ b/pkg/sidecar/swappable/swappable_test.go @@ -30,9 +30,8 @@ import ( ) const ( - namespace = "default" - name = "channel1" replaceDomain = "replaceDomain" + hostName = "a.b.c.d" ) func TestHandler(t *testing.T) { @@ -44,8 +43,7 @@ func TestHandler(t *testing.T) { { ChannelConfigs: []multichannelfanout.ChannelConfig{ { - Namespace: namespace, - Name: name, + HostName: hostName, FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -59,8 +57,7 @@ func TestHandler(t *testing.T) { { ChannelConfigs: []multichannelfanout.ChannelConfig{ { - Namespace: namespace, - Name: name, + HostName: hostName, FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -96,8 +93,7 @@ func TestHandler_InvalidConfigChange(t *testing.T) { initialConfig: multichannelfanout.Config{ ChannelConfigs: []multichannelfanout.ChannelConfig{ { - Namespace: namespace, - Name: name, + HostName: hostName, FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -112,12 +108,10 @@ func TestHandler_InvalidConfigChange(t *testing.T) { // Duplicate (namespace, name). ChannelConfigs: []multichannelfanout.ChannelConfig{ { - Namespace: namespace, - Name: name, + HostName: hostName, }, { - Namespace: namespace, - Name: name, + HostName: hostName, }, }, }, @@ -183,7 +177,7 @@ func updateConfigAndTest(t *testing.T, h *Handler, config multichannelfanout.Con func assertRequestAccepted(t *testing.T, h *Handler) { w := httptest.NewRecorder() - h.ServeHTTP(w, makeRequest(namespace, name)) + h.ServeHTTP(w, makeRequest(hostName)) if w.Code != http.StatusAccepted { t.Errorf("Unexpected response code. Expected 202. Actual %v", w.Code) } @@ -196,8 +190,8 @@ func (*successHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { _ = r.Body.Close() } -func makeRequest(namespace, name string) *http.Request { - r := httptest.NewRequest("POST", fmt.Sprintf("http://%s.%s/", name, namespace), strings.NewReader("")) +func makeRequest(hostName string) *http.Request { + r := httptest.NewRequest("POST", fmt.Sprintf("http://%s/", hostName), strings.NewReader("")) return r } diff --git a/test/crd.go b/test/crd.go index efbab8f3888..53864c882cc 100644 --- a/test/crd.go +++ b/test/crd.go @@ -31,6 +31,9 @@ const ( // ClusterChannelProvisioner returns a ClusterChannelProvisioner for a given name. func ClusterChannelProvisioner(name string) *corev1.ObjectReference { + if name == "" { + return nil + } return pkgTest.CoreV1ObjectReference("ClusterChannelProvisioner", eventsApiVersion, name) } diff --git a/third_party/VENDOR-LICENSE b/third_party/VENDOR-LICENSE index e8d7037e247..697031fbfb7 100644 --- a/third_party/VENDOR-LICENSE +++ b/third_party/VENDOR-LICENSE @@ -627,40 +627,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -=========================================================== -Import: github.com/knative/eventing/vendor/github.com/fsnotify/fsnotify - -Copyright (c) 2012 The Go Authors. All rights reserved. -Copyright (c) 2012 fsnotify Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - - =========================================================== Import: github.com/knative/eventing/vendor/github.com/ghodss/yaml