diff --git a/cmd/fanoutsidecar/main.go b/cmd/fanoutsidecar/main.go index 370289ffb5e..fc1927d0f01 100644 --- a/cmd/fanoutsidecar/main.go +++ b/cmd/fanoutsidecar/main.go @@ -29,15 +29,9 @@ import ( "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/channelwatcher" - "github.com/knative/eventing/pkg/logging" - "github.com/knative/eventing/pkg/sidecar/fanout" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" "github.com/knative/eventing/pkg/sidecar/swappable" "go.uber.org/zap" "go.uber.org/zap/zapcore" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" @@ -137,48 +131,11 @@ func setupChannelWatcher(logger *zap.Logger, configUpdated swappable.UpdateConfi logger.Error("Error while adding eventing scheme to manager.", zap.Error(err)) return nil, err } - channelwatcher.New(mgr, logger, updateChannelConfig(configUpdated)) + channelwatcher.New(mgr, logger, channelwatcher.UpdateConfigWatchHandler(configUpdated, shouldWatch)) return mgr, nil } -func updateChannelConfig(updateConfig swappable.UpdateConfig) channelwatcher.WatchHandlerFunc { - return func(ctx context.Context, c client.Client, chanNamespacedName types.NamespacedName) error { - channels, err := listAllChannels(ctx, c) - if err != nil { - logging.FromContext(ctx).Info("Unable to list channels", zap.Error(err)) - return err - } - config := multiChannelFanoutConfig(channels) - return updateConfig(config) - } -} - -func listAllChannels(ctx context.Context, c client.Client) ([]v1alpha1.Channel, error) { - channels := make([]v1alpha1.Channel, 0) - for { - cl := &v1alpha1.ChannelList{} - opts := &client.ListOptions{ - // Set Raw because if we need to get more than one page, then we will put the continue token - // into opts.Raw.Continue. - Raw: &metav1.ListOptions{}, - } - if err := c.List(ctx, opts, cl); err != nil { - return nil, err - } - for _, c := range cl.Items { - if c.Status.IsReady() && shouldWatch(&c) { - channels = append(channels, c) - } - } - if cl.Continue != "" { - opts.Raw.Continue = cl.Continue - } else { - return channels, nil - } - } -} - func shouldWatch(ch *v1alpha1.Channel) bool { if ch.Spec.Provisioner != nil && ch.Spec.Provisioner.Namespace == "" { for _, v := range channelProvisioners { @@ -190,26 +147,6 @@ func shouldWatch(ch *v1alpha1.Channel) bool { return false } -func multiChannelFanoutConfig(channels []v1alpha1.Channel) *multichannelfanout.Config { - cc := make([]multichannelfanout.ChannelConfig, 0) - for _, c := range channels { - channelConfig := multichannelfanout.ChannelConfig{ - Namespace: c.Namespace, - Name: c.Name, - HostName: c.Status.Address.Hostname, - } - if c.Spec.Subscribable != nil { - channelConfig.FanoutConfig = fanout.Config{ - Subscriptions: c.Spec.Subscribable.Subscribers, - } - } - cc = append(cc, channelConfig) - } - return &multichannelfanout.Config{ - ChannelConfigs: cc, - } -} - // runnableServer is a small wrapper around http.Server so that it matches the manager.Runnable // interface. type runnableServer struct { diff --git a/contrib/gcppubsub/pkg/dispatcher/cmd/main.go b/contrib/gcppubsub/pkg/dispatcher/cmd/main.go index a2a82a4e638..078e08bf2d4 100644 --- a/contrib/gcppubsub/pkg/dispatcher/cmd/main.go +++ b/contrib/gcppubsub/pkg/dispatcher/cmd/main.go @@ -61,7 +61,10 @@ func main() { // PubSub) and the dispatcher (takes messages in PubSub and sends them in cluster) in this // binary. - _, runnables := receiver.New(logger.Desugar(), mgr.GetClient(), util.GcpPubSubClientCreator) + _, runnables, err := receiver.New(logger.Desugar(), mgr.GetClient(), util.GcpPubSubClientCreator) + if err != nil { + logger.Fatal("Unable to create new receiver and runnable", zap.Error(err)) + } for _, runnable := range runnables { err = mgr.Add(runnable) if err != nil { diff --git a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go index 702ee7fd5f4..665bb80dda5 100644 --- a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go +++ b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go @@ -44,7 +44,7 @@ type Receiver struct { // New creates a new Receiver and its associated MessageReceiver. The caller is responsible for // Start()ing the returned MessageReceiver. -func New(logger *zap.Logger, client client.Client, pubSubClientCreator util.PubSubClientCreator) (*Receiver, []manager.Runnable) { +func New(logger *zap.Logger, client client.Client, pubSubClientCreator util.PubSubClientCreator) (*Receiver, []manager.Runnable, error) { r := &Receiver{ logger: logger, client: client, @@ -52,10 +52,14 @@ func New(logger *zap.Logger, client client.Client, pubSubClientCreator util.PubS pubSubClientCreator: pubSubClientCreator, cache: cache.NewTTL(), } - return r, []manager.Runnable{r.newMessageReceiver(), r.cache} + receiver, err := r.newMessageReceiver() + if err != nil { + return nil, nil, err + } + return r, []manager.Runnable{receiver, r.cache}, nil } -func (r *Receiver) newMessageReceiver() *provisioners.MessageReceiver { +func (r *Receiver) newMessageReceiver() (*provisioners.MessageReceiver, error) { return provisioners.NewMessageReceiver(r.sendEventToTopic, r.logger.Sugar()) } diff --git a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go index 6d9b2353b30..c4789c2c9ed 100644 --- a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go +++ b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go @@ -129,14 +129,21 @@ func TestReceiver(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - mr, _ := New( + mr, _, err := New( zap.NewNop(), fake.NewFakeClient(tc.initialState...), fakepubsub.Creator(tc.pubSubData)) + if err != nil { + t.Fatalf("Error when creating a New receiver. Error:%s", err) + } resp := httptest.NewRecorder() req := httptest.NewRequest("POST", "/", strings.NewReader(validMessage)) req.Host = "test-channel.test-namespace.channels." + utils.GetClusterDomainName() - mr.newMessageReceiver().HandleRequest(resp, req) + receiver, err := mr.newMessageReceiver() + if err != nil { + t.Fatalf("Error when creating a new message receiver. Error:%s", err) + } + receiver.HandleRequest(resp, req) if tc.expectedErr { if resp.Result().StatusCode >= 200 && resp.Result().StatusCode < 300 { t.Errorf("Expected an error. Actual: %v", resp.Result()) diff --git a/contrib/kafka/cmd/controller/main.go b/contrib/kafka/cmd/controller/main.go index 375361f4af3..be99d7231ef 100644 --- a/contrib/kafka/cmd/controller/main.go +++ b/contrib/kafka/cmd/controller/main.go @@ -8,7 +8,6 @@ import ( "github.com/knative/eventing/contrib/kafka/pkg/controller/channel" eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -47,7 +46,6 @@ func _main() int { // Add custom types to this array to get them into the manager's scheme. schemeFuncs := []SchemeFunc{ eventingv1alpha.AddToScheme, - istiov1alpha3.AddToScheme, } for _, schemeFunc := range schemeFuncs { schemeFunc(mgr.GetScheme()) diff --git a/contrib/kafka/cmd/dispatcher/main.go b/contrib/kafka/cmd/dispatcher/main.go index 9ef18689623..b0d1c2ac286 100644 --- a/contrib/kafka/cmd/dispatcher/main.go +++ b/contrib/kafka/cmd/dispatcher/main.go @@ -18,38 +18,25 @@ package main import ( "flag" - "fmt" "log" - "os" + "github.com/knative/eventing/contrib/kafka/pkg/controller" provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller" "github.com/knative/eventing/contrib/kafka/pkg/dispatcher" - "github.com/knative/eventing/pkg/sidecar/configmap/watcher" - "github.com/knative/eventing/pkg/utils" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/channelwatcher" "github.com/knative/pkg/signals" - "github.com/knative/pkg/system" "go.uber.org/zap" - "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" ) func main() { - configMapName := os.Getenv("DISPATCHER_CONFIGMAP_NAME") - if configMapName == "" { - configMapName = provisionerController.DispatcherConfigMapName - } - configMapNamespace := os.Getenv("DISPATCHER_CONFIGMAP_NAMESPACE") - if configMapNamespace == "" { - configMapNamespace = system.Namespace() - } - flag.Parse() logger, err := zap.NewProduction() if err != nil { log.Fatalf("unable to create logger: %v", err) } - provisionerConfig, err := provisionerController.GetProvisionerConfig("/etc/config-provisioner") if err != nil { logger.Fatal("unable to load provisioner config", zap.Error(err)) @@ -68,17 +55,12 @@ func main() { logger.Fatal("Unable to add kafkaDispatcher", zap.Error(err)) } - kc, err := kubernetes.NewForConfig(mgr.GetConfig()) - if err != nil { - logger.Fatal("unable to create kubernetes client.", zap.Error(err)) + if err := v1alpha1.AddToScheme(mgr.GetScheme()); err != nil { + logger.Fatal("Unable to add scheme for eventing apis.", zap.Error(err)) } - cmw, err := watcher.NewWatcher(logger, kc, configMapNamespace, configMapName, kafkaDispatcher.UpdateConfig) - if err != nil { - logger.Fatal("unable to create configMap watcher", zap.String("configMap", fmt.Sprintf("%s/%s", configMapNamespace, configMapName))) - } - if err = mgr.Add(utils.NewBlockingStart(logger, cmw)); err != nil { - logger.Fatal("Unable to add the configMap watcher to the manager", zap.Error(err)) + if err := channelwatcher.New(mgr, logger, channelwatcher.UpdateConfigWatchHandler(kafkaDispatcher.UpdateConfig, shouldWatch)); err != nil { + logger.Fatal("Unable to create channel watcher.", zap.Error(err)) } // set up signals so we handle the first shutdown signal gracefully @@ -89,3 +71,9 @@ func main() { } logger.Info("Exiting...") } + +func shouldWatch(ch *v1alpha1.Channel) bool { + return ch.Spec.Provisioner != nil && + ch.Spec.Provisioner.Namespace == "" && + ch.Spec.Provisioner.Name == controller.Name +} diff --git a/contrib/kafka/config/kafka.yaml b/contrib/kafka/config/kafka.yaml index dc28a8636da..31506c804de 100644 --- a/contrib/kafka/config/kafka.yaml +++ b/contrib/kafka/config/kafka.yaml @@ -68,22 +68,12 @@ rules: verbs: - update - apiGroups: - - networking.istio.io + - "" # Core API Group. resources: - - virtualservices + - events verbs: - - get - - list - - watch - - create - - update - - apiGroups: - - "" # Core API Group. - resources: - - events - verbs: - - create - - patch + - create + - patch - update --- @@ -170,6 +160,13 @@ rules: - get - list - watch + - apiGroups: + - eventing.knative.dev + resources: + - channels + verbs: + - list + - watch --- @@ -211,13 +208,6 @@ spec: containers: - name: dispatcher image: github.com/knative/eventing/contrib/kafka/cmd/dispatcher - env: - - name: DISPATCHER_CONFIGMAP_NAME - value: kafka-channel-dispatcher - - name: DISPATCHER_CONFIGMAP_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace volumeMounts: - name: kafka-channel-controller-config mountPath: /etc/config-provisioner diff --git a/contrib/kafka/main.go b/contrib/kafka/main.go index ed98481c20b..62df224cc98 100644 --- a/contrib/kafka/main.go +++ b/contrib/kafka/main.go @@ -8,7 +8,6 @@ import ( "github.com/knative/eventing/contrib/kafka/pkg/controller/channel" eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -47,7 +46,6 @@ func main() { // Add custom types to this array to get them into the manager's scheme. schemeFuncs := []SchemeFunc{ eventingv1alpha.AddToScheme, - istiov1alpha3.AddToScheme, } for _, schemeFunc := range schemeFuncs { schemeFunc(mgr.GetScheme()) diff --git a/contrib/kafka/pkg/controller/channel/provider.go b/contrib/kafka/pkg/controller/channel/provider.go index 7c9d413d246..73eab2e8d22 100644 --- a/contrib/kafka/pkg/controller/channel/provider.go +++ b/contrib/kafka/pkg/controller/channel/provider.go @@ -18,7 +18,6 @@ package channel import ( "github.com/Shopify/sarama" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -49,11 +48,10 @@ var ( ) type reconciler struct { - client client.Client - recorder record.EventRecorder - logger *zap.Logger - config *common.KafkaProvisionerConfig - configMapKey client.ObjectKey + client client.Client + recorder record.EventRecorder + logger *zap.Logger + config *common.KafkaProvisionerConfig // Using a shared kafkaClusterAdmin does not work currently because of an issue with // Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162. kafkaClusterAdmin sarama.ClusterAdmin @@ -67,10 +65,9 @@ func ProvideController(mgr manager.Manager, config *common.KafkaProvisionerConfi // Setup a new controller to Reconcile Channel. c, err := controller.New(controllerAgentName, mgr, controller.Options{ Reconciler: &reconciler{ - recorder: mgr.GetRecorder(controllerAgentName), - logger: logger, - config: config, - configMapKey: defaultConfigMapKey, + recorder: mgr.GetRecorder(controllerAgentName), + logger: logger, + config: config, }, }) if err != nil { @@ -89,13 +86,6 @@ func ProvideController(mgr manager.Manager, config *common.KafkaProvisionerConfi return nil, err } - // Watch the VirtualServices that are owned by Channels. - err = c.Watch(&source.Kind{Type: &istiov1alpha3.VirtualService{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true}) - if err != nil { - logger.Error("unable to watch VirtualServices.", zap.Error(err)) - return nil, err - } - return c, nil } diff --git a/contrib/kafka/pkg/controller/channel/reconcile.go b/contrib/kafka/pkg/controller/channel/reconcile.go index cf64955bc43..34a7e1c9b71 100644 --- a/contrib/kafka/pkg/controller/channel/reconcile.go +++ b/contrib/kafka/pkg/controller/channel/reconcile.go @@ -23,10 +23,8 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -35,10 +33,6 @@ import ( util "github.com/knative/eventing/pkg/provisioners" topicUtils "github.com/knative/eventing/pkg/provisioners/utils" eventingNames "github.com/knative/eventing/pkg/reconciler/names" - "github.com/knative/eventing/pkg/sidecar/configmap" - "github.com/knative/eventing/pkg/sidecar/fanout" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - "k8s.io/apimachinery/pkg/api/equality" ) const ( @@ -97,30 +91,28 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err return reconcile.Result{}, nil } - newChannel := channel.DeepCopy() - - newChannel.Status.InitializeConditions() + channel.Status.InitializeConditions() var requeue = false if clusterChannelProvisioner.Status.IsReady() { // Reconcile this copy of the Channel and then write back any status // updates regardless of whether the reconcile error out. - requeue, err = r.reconcile(ctx, newChannel) + requeue, err = r.reconcile(ctx, channel) } else { - newChannel.Status.MarkNotProvisioned("NotProvisioned", "ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name) + channel.Status.MarkNotProvisioned("NotProvisioned", "ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name) err = fmt.Errorf("ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name) } if err != nil { r.logger.Error("Dispatcher reconciliation failed", zap.Error(err)) - r.recorder.Eventf(newChannel, v1.EventTypeWarning, dispatcherReconcileFailed, "Dispatcher reconciliation failed: %v", err) + r.recorder.Eventf(channel, v1.EventTypeWarning, dispatcherReconcileFailed, "Dispatcher reconciliation failed: %v", err) } else { r.logger.Debug("Channel reconciled") } - if updateChannelErr := util.UpdateChannel(ctx, r.client, newChannel); updateChannelErr != nil { + if updateChannelErr := util.UpdateChannel(ctx, r.client, channel); updateChannelErr != nil { r.logger.Info("failed to update channel status", zap.Error(updateChannelErr)) - r.recorder.Eventf(newChannel, v1.EventTypeWarning, dispatcherUpdateStatusFailed, "Failed to update Channel's dispatcher status: %v", err) + r.recorder.Eventf(channel, v1.EventTypeWarning, dispatcherUpdateStatusFailed, "Failed to update Channel's dispatcher status: %v", err) return reconcile.Result{}, updateChannelErr } @@ -134,13 +126,6 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // boolean indicates if this Channel should be immediately requeued for another reconcile loop. The // returned error indicates an error during reconciliation. func (r *reconciler) reconcile(ctx context.Context, channel *eventingv1alpha1.Channel) (bool, error) { - - // We always need to sync the Channel config, so do it first. - if err := r.syncChannelConfig(ctx); err != nil { - r.logger.Info("error updating syncing the Channel config", zap.Error(err)) - return false, err - } - // We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time. // This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162. // Once the issue is fixed we should use a shared cluster admin client. Also, r.kafkaClusterAdmin is currently @@ -177,19 +162,12 @@ func (r *reconciler) reconcile(ctx context.Context, channel *eventingv1alpha1.Ch return false, err } - svc, err := util.CreateK8sService(ctx, r.client, channel) + svc, err := util.CreateK8sService(ctx, r.client, channel, util.ExternalService(channel)) if err != nil { r.logger.Info("error creating the Channel's K8s Service", zap.Error(err)) return false, err } channel.Status.SetAddress(eventingNames.ServiceHostName(svc.Name, svc.Namespace)) - - _, err = util.CreateVirtualService(ctx, r.client, channel, svc) - if err != nil { - r.logger.Info("error creating the Virtual Service for the Channel", zap.Error(err)) - return false, err - } - channel.Status.MarkProvisioned() // close the connection @@ -268,110 +246,6 @@ func (r *reconciler) getClusterChannelProvisioner() (*eventingv1alpha1.ClusterCh return clusterChannelProvisioner, nil } -func (r *reconciler) syncChannelConfig(ctx context.Context) error { - channels, err := r.listAllChannels(ctx) - if err != nil { - r.logger.Info("Unable to list channels", zap.Error(err)) - return err - } - config := multiChannelFanoutConfig(channels) - return r.writeConfigMap(ctx, config) -} - -func (r *reconciler) writeConfigMap(ctx context.Context, config *multichannelfanout.Config) error { - logger := r.logger.With(zap.Any("configMap", r.configMapKey)) - - updated, err := configmap.SerializeConfig(*config) - if err != nil { - r.logger.Error("Unable to serialize config", zap.Error(err), zap.Any("config", config)) - return err - } - - cm := &corev1.ConfigMap{} - err = r.client.Get(ctx, r.configMapKey, cm) - if errors.IsNotFound(err) { - cm = r.createNewConfigMap(updated) - err = r.client.Create(ctx, cm) - if err != nil { - logger.Info("Unable to create ConfigMap", zap.Error(err)) - return err - } - } - if err != nil { - logger.Info("Unable to get ConfigMap", zap.Error(err)) - return err - } - - if equality.Semantic.DeepEqual(cm.Data, updated) { - // Nothing to update. - return nil - } - - cm.Data = updated - return r.client.Update(ctx, cm) -} - -func (r *reconciler) createNewConfigMap(data map[string]string) *corev1.ConfigMap { - return &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: r.configMapKey.Namespace, - Name: r.configMapKey.Name, - }, - Data: data, - } -} - -func multiChannelFanoutConfig(channels []eventingv1alpha1.Channel) *multichannelfanout.Config { - cc := make([]multichannelfanout.ChannelConfig, 0) - for _, c := range channels { - channelConfig := multichannelfanout.ChannelConfig{ - Namespace: c.Namespace, - Name: c.Name, - } - if c.Spec.Subscribable != nil { - channelConfig.FanoutConfig = fanout.Config{ - Subscriptions: c.Spec.Subscribable.Subscribers, - } - } - cc = append(cc, channelConfig) - } - return &multichannelfanout.Config{ - ChannelConfigs: cc, - } -} - -func (r *reconciler) listAllChannels(ctx context.Context) ([]eventingv1alpha1.Channel, error) { - clusterChannelProvisioner, err := r.getClusterChannelProvisioner() - if err != nil { - return nil, err - } - - channels := make([]eventingv1alpha1.Channel, 0) - - opts := &client.ListOptions{ - // Set Raw because if we need to get more than one page, then we will put the continue token - // into opts.Raw.Continue. - Raw: &metav1.ListOptions{}, - } - for { - cl := &eventingv1alpha1.ChannelList{} - if err = r.client.List(ctx, opts, cl); err != nil { - return nil, err - } - - for _, c := range cl.Items { - if r.shouldReconcile(&c, clusterChannelProvisioner) { - channels = append(channels, c) - } - } - if cl.Continue != "" { - opts.Raw.Continue = cl.Continue - } else { - return channels, nil - } - } -} - func createKafkaAdminClient(config *controller.KafkaProvisionerConfig) (sarama.ClusterAdmin, error) { saramaConf := sarama.NewConfig() saramaConf.Version = sarama.V1_1_0_0 diff --git a/contrib/kafka/pkg/controller/channel/reconcile_test.go b/contrib/kafka/pkg/controller/channel/reconcile_test.go index aeec9ab7de1..02836e06a54 100644 --- a/contrib/kafka/pkg/controller/channel/reconcile_test.go +++ b/contrib/kafka/pkg/controller/channel/reconcile_test.go @@ -30,10 +30,11 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/reconciler/names" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" "github.com/knative/eventing/pkg/utils" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "github.com/knative/pkg/system" _ "github.com/knative/pkg/system/testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -70,7 +71,6 @@ var ( func init() { // Add types to scheme eventingv1alpha1.AddToScheme(scheme.Scheme) - istiov1alpha3.AddToScheme(scheme.Scheme) } var mockFetchError = controllertesting.Mocks{ @@ -142,7 +142,6 @@ var testCases = []controllertesting.TestCase{ InitialState: []runtime.Object{ getNewClusterChannelProvisioner(clusterChannelProvisionerName, true), getNewChannel(channelName, clusterChannelProvisionerName), - makeVirtualService(), }, WantResult: reconcile.Result{ Requeue: true, @@ -156,10 +155,10 @@ var testCases = []controllertesting.TestCase{ InitialState: []runtime.Object{ getNewClusterChannelProvisioner(clusterChannelProvisionerName, true), getNewChannelWithStatusAndFinalizer(channelName, clusterChannelProvisionerName), - makeVirtualService(), }, WantPresent: []runtime.Object{ getNewChannelProvisionedStatus(channelName, clusterChannelProvisionerName), + makeK8sService(), }, }, { @@ -523,18 +522,35 @@ func getNewClusterChannelProvisioner(name string, isReady bool) *eventingv1alpha return clusterChannelProvisioner } -func makeVirtualService() *istiov1alpha3.VirtualService { - return &istiov1alpha3.VirtualService{ +func om(namespace, name string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name), + UID: testUID, + } +} + +func getControllerConfig() *controller.KafkaProvisionerConfig { + return &controller.KafkaProvisionerConfig{ + Brokers: []string{"test-broker"}, + } +} + +func makeK8sService() *corev1.Service { + return &corev1.Service{ TypeMeta: metav1.TypeMeta{ - APIVersion: istiov1alpha3.SchemeGroupVersion.String(), - Kind: "VirtualService", + APIVersion: "v1", + Kind: "Service", }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-channel", testNS), - Namespace: testNS, + GenerateName: fmt.Sprintf("%s-channel-", channelName), + Namespace: testNS, Labels: map[string]string{ - "channel": channelName, - "provisioner": clusterChannelProvisionerName, + util.EventingChannelLabel: channelName, + util.OldEventingChannelLabel: channelName, + util.EventingProvisionerLabel: clusterChannelProvisionerName, + util.OldEventingProvisionerLabel: clusterChannelProvisionerName, }, OwnerReferences: []metav1.OwnerReference{ { @@ -547,38 +563,9 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, }, }, - Spec: istiov1alpha3.VirtualServiceSpec{ - Hosts: []string{ - serviceAddress, - fmt.Sprintf("%s.%s.channels.%s", channelName, testNS, utils.GetClusterDomainName()), - }, - HTTP: []istiov1alpha3.HTTPRoute{{ - Rewrite: &istiov1alpha3.HTTPRewrite{ - Authority: fmt.Sprintf("%s.%s.channels.%s", channelName, testNS, utils.GetClusterDomainName()), - }, - Route: []istiov1alpha3.HTTPRouteDestination{{ - Destination: istiov1alpha3.Destination{ - Host: "kafka-provisioner.knative-testing.svc." + utils.GetClusterDomainName(), - Port: istiov1alpha3.PortSelector{ - Number: util.PortNumber, - }, - }}, - }}, - }, + Spec: corev1.ServiceSpec{ + ExternalName: names.ServiceHostName(fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName), system.Namespace()), + Type: "ExternalName", }, } } - -func om(namespace, name string) metav1.ObjectMeta { - return metav1.ObjectMeta{ - Namespace: namespace, - Name: name, - SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name), - } -} - -func getControllerConfig() *controller.KafkaProvisionerConfig { - return &controller.KafkaProvisionerConfig{ - Brokers: []string{"test-broker"}, - } -} diff --git a/contrib/kafka/pkg/dispatcher/dispatcher.go b/contrib/kafka/pkg/dispatcher/dispatcher.go index a7f362f2575..718abd88584 100644 --- a/contrib/kafka/pkg/dispatcher/dispatcher.go +++ b/contrib/kafka/pkg/dispatcher/dispatcher.go @@ -34,8 +34,10 @@ import ( ) type KafkaDispatcher struct { - config atomic.Value - updateLock sync.Mutex + // TODO: config doesn't have to be atomic as it is read and updated using updateLock. + config atomic.Value + hostToChannelMap atomic.Value + updateLock sync.Mutex receiver *provisioners.MessageReceiver dispatcher *provisioners.MessageDispatcher @@ -83,10 +85,10 @@ type subscription struct { ReplyURI string } -// ConfigDiff diffs the new config with the existing config. If there are no differences, then the +// configDiff diffs the new config with the existing config. If there are no differences, then the // empty string is returned. If there are differences, then a non-empty string is returned // describing the differences. -func (d *KafkaDispatcher) ConfigDiff(updated *multichannelfanout.Config) string { +func (d *KafkaDispatcher) configDiff(updated *multichannelfanout.Config) string { return cmp.Diff(d.getConfig(), updated) } @@ -98,12 +100,21 @@ func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error d.updateLock.Lock() defer d.updateLock.Unlock() - if diff := d.ConfigDiff(config); diff != "" { + if diff := d.configDiff(config); diff != "" { d.logger.Info("Updating config (-old +new)", zap.String("diff", diff)) + // Create hostToChannelMap before updating kafkaConsumers. + // But update the map only after updating kafkaConsumers. + hcMap, err := createHostToChannelMap(config) + if err != nil { + return err + } + newSubs := make(map[subscription]bool) - // Subscribe to new subscriptions + // Subscribe to new subscriptions. + // TODO: Error returned by subscribe/unsubscribe must be handled. + // https://github.com/knative/eventing/issues/1072. for _, cc := range config.ChannelConfigs { channelRef := provisioners.ChannelReference{ Name: cc.Name, @@ -129,6 +140,9 @@ func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error } } } + // At this point all updates are done and hostToChannelMap is created successfully. + // Update the atomic value. + d.setHostToChannelMap(hcMap) // Update the config so that it can be used for comparison during next sync d.setConfig(config) @@ -136,6 +150,23 @@ func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error return nil } +func createHostToChannelMap(config *multichannelfanout.Config) (map[string]provisioners.ChannelReference, error) { + hcMap := make(map[string]provisioners.ChannelReference, len(config.ChannelConfigs)) + for _, cConfig := range config.ChannelConfigs { + if cr, ok := hcMap[cConfig.HostName]; ok { + return nil, fmt.Errorf( + "duplicate hostName found. Each channel must have a unique host header. HostName:%s, channel:%s.%s, channel:%s.%s", + cConfig.HostName, + cConfig.Namespace, + cConfig.Name, + cr.Namespace, + cr.Name) + } + hcMap[cConfig.HostName] = provisioners.ChannelReference{Name: cConfig.Name, Namespace: cConfig.Namespace} + } + return hcMap, nil +} + // Start starts the kafka dispatcher's message processing. func (d *KafkaDispatcher) Start(stopCh <-chan struct{}) error { if d.receiver == nil { @@ -162,6 +193,8 @@ func (d *KafkaDispatcher) Start(stopCh <-chan struct{}) error { return d.receiver.Start(stopCh) } +// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine. +// subscribe must be called under updateLock. func (d *KafkaDispatcher) subscribe(channelRef provisioners.ChannelReference, sub subscription) error { d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) @@ -234,6 +267,8 @@ func (d *KafkaDispatcher) dispatch(channelRef provisioners.ChannelReference, sub return err } +// unsubscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine. +// unsubscribe must be called under updateLock. func (d *KafkaDispatcher) unsubscribe(channel provisioners.ChannelReference, sub subscription) error { d.logger.Info("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub)) if consumer, ok := d.kafkaConsumers[channel][sub]; ok { @@ -257,8 +292,15 @@ func (d *KafkaDispatcher) setConfig(config *multichannelfanout.Config) { d.config.Store(config) } -func NewDispatcher(brokers []string, consumerMode cluster.ConsumerMode, logger *zap.Logger) (*KafkaDispatcher, error) { +func (d *KafkaDispatcher) getHostToChannelMap() map[string]provisioners.ChannelReference { + return d.hostToChannelMap.Load().(map[string]provisioners.ChannelReference) +} +func (d *KafkaDispatcher) setHostToChannelMap(hcMap map[string]provisioners.ChannelReference) { + d.hostToChannelMap.Store(hcMap) +} + +func NewDispatcher(brokers []string, consumerMode cluster.ConsumerMode, logger *zap.Logger) (*KafkaDispatcher, error) { conf := sarama.NewConfig() conf.Version = sarama.V1_1_0_0 conf.ClientID = controller.Name + "-dispatcher" @@ -281,16 +323,31 @@ func NewDispatcher(brokers []string, consumerMode cluster.ConsumerMode, logger * logger: logger, } - receiverFunc := provisioners.NewMessageReceiver( + receiverFunc, err := provisioners.NewMessageReceiver( func(channel provisioners.ChannelReference, message *provisioners.Message) error { dispatcher.kafkaAsyncProducer.Input() <- toKafkaMessage(channel, message) return nil - }, logger.Sugar()) + }, + logger.Sugar(), + provisioners.ResolveChannelFromHostHeader(provisioners.ResolveChannelFromHostFunc(dispatcher.getChannelReferenceFromHost))) + if err != nil { + return nil, err + } dispatcher.receiver = receiverFunc dispatcher.setConfig(&multichannelfanout.Config{}) + dispatcher.setHostToChannelMap(map[string]provisioners.ChannelReference{}) return dispatcher, nil } +func (d *KafkaDispatcher) getChannelReferenceFromHost(host string) (provisioners.ChannelReference, error) { + chMap := d.getHostToChannelMap() + cr, ok := chMap[host] + if !ok { + return cr, fmt.Errorf("invalid Hostname:%s. Hostname not found in ConfigMap for any Channel", host) + } + return cr, nil +} + func fromKafkaMessage(kafkaMessage *sarama.ConsumerMessage) *provisioners.Message { headers := make(map[string]string) for _, header := range kafkaMessage.Headers { diff --git a/contrib/kafka/pkg/dispatcher/dispatcher_test.go b/contrib/kafka/pkg/dispatcher/dispatcher_test.go index 04e312777e4..eda73205238 100644 --- a/contrib/kafka/pkg/dispatcher/dispatcher_test.go +++ b/contrib/kafka/pkg/dispatcher/dispatcher_test.go @@ -182,23 +182,29 @@ func (c *mockSaramaCluster) GetConsumerMode() cluster.ConsumerMode { func TestDispatcher_UpdateConfig(t *testing.T) { testCases := []struct { - name string - oldConfig *multichannelfanout.Config - newConfig *multichannelfanout.Config - subscribes []string - unsubscribes []string - createErr string + name string + oldConfig *multichannelfanout.Config + newConfig *multichannelfanout.Config + subscribes []string + unsubscribes []string + createErr string + oldHostToChanMap map[string]provisioners.ChannelReference + newHostToChanMap map[string]provisioners.ChannelReference }{ { - name: "nil config", - oldConfig: &multichannelfanout.Config{}, - newConfig: nil, - createErr: "nil config", + name: "nil config", + oldConfig: &multichannelfanout.Config{}, + newConfig: nil, + createErr: "nil config", + oldHostToChanMap: map[string]provisioners.ChannelReference{}, + newHostToChanMap: map[string]provisioners.ChannelReference{}, }, { - name: "same config", - oldConfig: &multichannelfanout.Config{}, - newConfig: &multichannelfanout.Config{}, + name: "same config", + oldConfig: &multichannelfanout.Config{}, + newConfig: &multichannelfanout.Config{}, + oldHostToChanMap: map[string]provisioners.ChannelReference{}, + newHostToChanMap: map[string]provisioners.ChannelReference{}, }, { name: "config with no subscription", @@ -208,9 +214,14 @@ func TestDispatcher_UpdateConfig(t *testing.T) { { Namespace: "default", Name: "test-channel", + HostName: "a.b.c.d", }, }, }, + oldHostToChanMap: map[string]provisioners.ChannelReference{}, + newHostToChanMap: map[string]provisioners.ChannelReference{ + "a.b.c.d": provisioners.ChannelReference{Name: "test-channel", Namespace: "default"}, + }, }, { name: "single channel w/ new subscriptions", @@ -220,6 +231,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { { Namespace: "default", Name: "test-channel", + HostName: "a.b.c.d", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -235,7 +247,11 @@ func TestDispatcher_UpdateConfig(t *testing.T) { }, }, }, - subscribes: []string{"subscription-1", "subscription-2"}, + subscribes: []string{"subscription-1", "subscription-2"}, + oldHostToChanMap: map[string]provisioners.ChannelReference{}, + newHostToChanMap: map[string]provisioners.ChannelReference{ + "a.b.c.d": provisioners.ChannelReference{Name: "test-channel", Namespace: "default"}, + }, }, { name: "single channel w/ existing subscriptions", @@ -244,6 +260,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { { Namespace: "default", Name: "test-channel", + HostName: "a.b.c.d", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -260,6 +277,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { { Namespace: "default", Name: "test-channel", + HostName: "a.b.c.d", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -277,6 +295,12 @@ func TestDispatcher_UpdateConfig(t *testing.T) { }, subscribes: []string{"subscription-2", "subscription-3"}, unsubscribes: []string{"subscription-1"}, + oldHostToChanMap: map[string]provisioners.ChannelReference{ + "a.b.c.d": provisioners.ChannelReference{Name: "test-channel", Namespace: "default"}, + }, + newHostToChanMap: map[string]provisioners.ChannelReference{ + "a.b.c.d": provisioners.ChannelReference{Name: "test-channel", Namespace: "default"}, + }, }, { name: "multi channel w/old and new subscriptions", @@ -285,6 +309,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { { Namespace: "default", Name: "test-channel-1", + HostName: "a.b.c.d", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -302,6 +327,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { { Namespace: "default", Name: "test-channel-1", + HostName: "a.b.c.d", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -314,6 +340,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { { Namespace: "default", Name: "test-channel-2", + HostName: "e.f.g.h", FanoutConfig: fanout.Config{ Subscriptions: []eventingduck.ChannelSubscriberSpec{ { @@ -331,6 +358,33 @@ func TestDispatcher_UpdateConfig(t *testing.T) { }, subscribes: []string{"subscription-1", "subscription-3", "subscription-4"}, unsubscribes: []string{"subscription-2"}, + oldHostToChanMap: map[string]provisioners.ChannelReference{ + "a.b.c.d": provisioners.ChannelReference{Name: "test-channel-1", Namespace: "default"}, + }, + newHostToChanMap: map[string]provisioners.ChannelReference{ + "a.b.c.d": provisioners.ChannelReference{Name: "test-channel-1", Namespace: "default"}, + "e.f.g.h": provisioners.ChannelReference{Name: "test-channel-2", Namespace: "default"}, + }, + }, + { + name: "Duplicate hostnames", + oldConfig: &multichannelfanout.Config{}, + newConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "test-channel-1", + HostName: "a.b.c.d", + }, + { + Namespace: "default", + Name: "test-channel-2", + HostName: "a.b.c.d", + }, + }, + }, + createErr: "duplicate hostName found. Each channel must have a unique host header. HostName:a.b.c.d, channel:default.test-channel-2, channel:default.test-channel-1", + oldHostToChanMap: map[string]provisioners.ChannelReference{}, }, } @@ -344,10 +398,12 @@ func TestDispatcher_UpdateConfig(t *testing.T) { logger: zap.NewNop(), } d.setConfig(&multichannelfanout.Config{}) + d.setHostToChannelMap(map[string]provisioners.ChannelReference{}) // Initialize using oldConfig err := d.UpdateConfig(tc.oldConfig) if err != nil { + t.Errorf("unexpected error: %v", err) } oldSubscribers := sets.NewString() @@ -359,6 +415,12 @@ func TestDispatcher_UpdateConfig(t *testing.T) { if diff := sets.NewString(tc.unsubscribes...).Difference(oldSubscribers); diff.Len() != 0 { t.Errorf("subscriptions %+v were never subscribed", diff) } + if diff := cmp.Diff(tc.oldConfig, d.getConfig()); diff != "" { + t.Errorf("unexpected config (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.oldHostToChanMap, d.getHostToChannelMap()); diff != "" { + t.Errorf("unexpected hostToChannelMap (-want, +got) = %v", diff) + } // Update with new config err = d.UpdateConfig(tc.newConfig) @@ -383,6 +445,12 @@ func TestDispatcher_UpdateConfig(t *testing.T) { if diff := cmp.Diff(tc.subscribes, newSubscribers, sortStrings); diff != "" { t.Errorf("unexpected subscribers (-want, +got) = %v", diff) } + if diff := cmp.Diff(tc.newHostToChanMap, d.getHostToChannelMap()); diff != "" { + t.Errorf("unexpected hostToChannelMap (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.newConfig, d.getConfig()); diff != "" { + t.Errorf("unexpected config (-want, +got) = %v", diff) + } }) } @@ -604,9 +672,13 @@ func TestKafkaDispatcher_Start(t *testing.T) { t.Errorf("Expected error want %s, got %s", "message receiver is not set", err) } - d.receiver = provisioners.NewMessageReceiver(func(channel provisioners.ChannelReference, message *provisioners.Message) error { + receiver, err := provisioners.NewMessageReceiver(func(channel provisioners.ChannelReference, message *provisioners.Message) error { return nil }, zap.NewNop().Sugar()) + if err != nil { + t.Fatalf("Error creating new message receiver. Error:%s", err) + } + d.receiver = receiver err = d.Start(make(chan struct{})) if err == nil { t.Errorf("Expected error want %s, got %s", "kafkaAsyncProducer is not set", err) diff --git a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go index f9c8963a4f0..28a0b9ad7de 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go +++ b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go @@ -71,7 +71,11 @@ func NewDispatcher(natssURL, clusterID string, logger *zap.Logger) (*Subscriptio clusterID: clusterID, subscriptions: make(map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription), } - d.receiver = provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar()) + receiver, err := provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar()) + if err != nil { + return nil, err + } + d.receiver = receiver return d, nil } diff --git a/pkg/channelwatcher/channel_watcher.go b/pkg/channelwatcher/channel_watcher.go index b9a77670ce8..26b77b362b1 100644 --- a/pkg/channelwatcher/channel_watcher.go +++ b/pkg/channelwatcher/channel_watcher.go @@ -1,23 +1,37 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package channelwatcher import ( "context" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + "github.com/knative/eventing/pkg/sidecar/swappable" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) -type WatchHandlerFunc func(context.Context, client.Client, types.NamespacedName) error - type reconciler struct { client client.Client logger *zap.Logger @@ -34,6 +48,7 @@ func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) return reconcile.Result{}, nil } +// New creates a new instance of Channel Watcher that watches channels and calls the watchHandler on add, update, delete and generic event func New(mgr manager.Manager, logger *zap.Logger, watchHandler WatchHandlerFunc) error { c, err := controller.New("ChannelWatcher", mgr, controller.Options{ Reconciler: &reconciler{ @@ -57,3 +72,42 @@ func New(mgr manager.Manager, logger *zap.Logger, watchHandler WatchHandlerFunc) } return nil } + +// WatchHandlerFunc is called whenever an add, update, delete or generic event is triggered on a channel +type WatchHandlerFunc func(context.Context, client.Client, types.NamespacedName) error + +// ShouldWatchFunc is called while returning list of channels. +// Channels are included in the list if the return value is true. +type ShouldWatchFunc func(ch *v1alpha1.Channel) bool + +// UpdateConfigWatchHandler is a special handler that +// 1. Lists the channels for which shouldWatch returns true. +// 2. Creates a multi-channel-fanout-config. +// 3. Calls the updateConfig func with the new multi-channel-fanout-config. +// This is used by dispatchers or receivers to update their configs by watching channels. +func UpdateConfigWatchHandler(updateConfig swappable.UpdateConfig, shouldWatch ShouldWatchFunc) WatchHandlerFunc { + return func(ctx context.Context, c client.Client, _ types.NamespacedName) error { + channels, err := listAllChannels(ctx, c, shouldWatch) + if err != nil { + logging.FromContext(ctx).Info("Unable to list channels", zap.Error(err)) + return err + } + config := multichannelfanout.NewConfigFromChannels(channels) + return updateConfig(config) + } +} + +// listAllChannels queries client and gets list of all channels for which shouldWatch returns true. +func listAllChannels(ctx context.Context, c client.Client, shouldWatch ShouldWatchFunc) ([]v1alpha1.Channel, error) { + channels := make([]v1alpha1.Channel, 0) + cl := &v1alpha1.ChannelList{} + if err := c.List(ctx, &client.ListOptions{}, cl); err != nil { + return nil, err + } + for _, c := range cl.Items { + if c.Status.IsReady() && shouldWatch(&c) { + channels = append(channels, c) + } + } + return channels, nil +} diff --git a/pkg/channelwatcher/channel_watcher_test.go b/pkg/channelwatcher/channel_watcher_test.go new file mode 100644 index 00000000000..5ff41cec642 --- /dev/null +++ b/pkg/channelwatcher/channel_watcher_test.go @@ -0,0 +1,189 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package channelwatcher + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + "github.com/knative/eventing/pkg/sidecar/swappable" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" +) + +func init() { + // Add types to scheme + _ = v1alpha1.AddToScheme(scheme.Scheme) +} + +func TestUpdateConfigWatchHandler(t *testing.T) { + tests := []struct { + name string + channels []runtime.Object + clientListError error + updateConfigError error + expectedConfig *multichannelfanout.Config + }{ + { + name: "Client list error", + clientListError: fmt.Errorf("Client list error"), + }, + { + name: "update config error", + updateConfigError: fmt.Errorf("error updating config"), + expectedConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{}, + }, + }, + { + name: "Successfully update config", + channels: []runtime.Object{ + makeChannel("chan-1", "ns-1", "e.f.g.h", makeSubscribable(makeSubscriber("sub1"), makeSubscriber("sub2"))), + makeChannel("chan-2", "ns-2", "i.j.k.l", makeSubscribable(makeSubscriber("sub3"), makeSubscriber("sub4"))), + makeChannel("chan-3", "donotwatch", "i.j.k.l", makeSubscribable(makeSubscriber("sub3"), makeSubscriber("sub4"))), + }, + expectedConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Name: "chan-1", + Namespace: "ns-1", + HostName: "e.f.g.h", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + makeSubscriber("sub1"), + makeSubscriber("sub2"), + }, + }, + }, { + Name: "chan-2", + Namespace: "ns-2", + HostName: "i.j.k.l", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + makeSubscriber("sub3"), + makeSubscriber("sub4"), + }, + }, + }, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actualConfig := ConfigHolder{} + watchHandler := UpdateConfigWatchHandler(updateConfigWrapper(&actualConfig, test.updateConfigError), shouldWatch) + mockClient := getClient(test.channels, getClientMocks(test.clientListError)) + + actualError := watchHandler(context.TODO(), mockClient, types.NamespacedName{}) + if actualError != nil { + if test.clientListError != nil { + if diff := cmp.Diff(test.clientListError.Error(), actualError.Error()); diff != "" { + t.Fatalf("Unexpected difference (-want +got): %v", diff) + } + } + if test.updateConfigError != nil { + if diff := cmp.Diff(test.updateConfigError.Error(), actualError.Error()); diff != "" { + t.Fatalf("Unexpected difference (-want +got): %v", diff) + } + } + } else { + if test.clientListError != nil { + t.Fatalf("Want error %v \n Got nil", test.clientListError) + } + if test.updateConfigError != nil { + t.Fatalf("Want error %v \n Got nil", test.updateConfigError) + } + } + if diff := cmp.Diff(test.expectedConfig, actualConfig.config); diff != "" { + t.Fatalf("Unexpected difference (-want +got): %v", diff) + } + }) + } +} + +type ConfigHolder struct { + config *multichannelfanout.Config +} + +func shouldWatch(c *v1alpha1.Channel) bool { + if c.Namespace == "donotwatch" { + return false + } + return true +} +func updateConfigWrapper(ch *ConfigHolder, returnError error) swappable.UpdateConfig { + return func(c *multichannelfanout.Config) error { + ch.config = c + return returnError + } +} + +func getClient(objs []runtime.Object, mocks controllertesting.Mocks) *controllertesting.MockClient { + innerClient := fake.NewFakeClient(objs...) + return controllertesting.NewMockClient(innerClient, mocks) +} + +func getClientMocks(listError error) controllertesting.Mocks { + if listError != nil { + return controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, _ *client.ListOptions, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, listError + }, + }, + } + } + return controllertesting.Mocks{} +} + +func makeChannel(name string, namespace string, hostname string, subscribable *eventingduck.Subscribable) *v1alpha1.Channel { + c := v1alpha1.Channel{ + Spec: v1alpha1.ChannelSpec{ + Subscribable: subscribable, + }, + } + c.Name = name + c.Namespace = namespace + c.Status.InitializeConditions() + c.Status.MarkProvisioned() + c.Status.MarkProvisionerInstalled() + c.Status.SetAddress(hostname) + return &c +} +func makeSubscribable(subsriberSpec ...eventingduck.ChannelSubscriberSpec) *eventingduck.Subscribable { + return &eventingduck.Subscribable{ + Subscribers: subsriberSpec, + } +} + +func makeSubscriber(name string) eventingduck.ChannelSubscriberSpec { + return eventingduck.ChannelSubscriberSpec{ + SubscriberURI: name + "-suburi", + ReplyURI: name + "-replyuri", + } +} diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index d31d490b139..69e1224e062 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -62,15 +62,17 @@ func AddFinalizer(o metav1.Object, finalizerName string) AddFinalizerResult { return FinalizerAdded } +// RemoveFinalizer removes the finalizer(finalizerName) from the object(o) if the finalizer is present. +// Returns: - FinalizerRemoved, if the finalizer was found and removed. +// - FinalizerNotFound, if the finalizer was not found. func RemoveFinalizer(o metav1.Object, finalizerName string) RemoveFinalizerResult { - result := FinalizerNotFound finalizers := sets.NewString(o.GetFinalizers()...) if finalizers.Has(finalizerName) { - result = FinalizerRemoved finalizers.Delete(finalizerName) o.SetFinalizers(finalizers.List()) + return FinalizerRemoved } - return result + return FinalizerNotFound } // K8sServiceOption is a functional option that can modify the K8s Service in CreateK8sService diff --git a/pkg/provisioners/message_receiver.go b/pkg/provisioners/message_receiver.go index 3874fded80b..175c796762b 100644 --- a/pkg/provisioners/message_receiver.go +++ b/pkg/provisioners/message_receiver.go @@ -32,26 +32,47 @@ const ( MessageReceiverPort = 8080 ) -// Message receiver receives messages. +// MessageReceiver receives messages. type MessageReceiver struct { - receiverFunc func(ChannelReference, *Message) error - forwardHeaders sets.String - forwardPrefixes []string + receiverFunc func(ChannelReference, *Message) error + forwardHeaders sets.String + forwardPrefixes []string + logger *zap.SugaredLogger + hostToChannelFunc ResolveChannelFromHostFunc +} + +// ReceiverOptions provides functional options to MessageReceiver function. +type ReceiverOptions func(*MessageReceiver) error - logger *zap.SugaredLogger +// ResolveChannelFromHostFunc function enables MessageReceiver to get the Channel Reference from incoming request HostHeader +// before calling receiverFunc. +type ResolveChannelFromHostFunc func(string) (ChannelReference, error) + +// ResolveChannelFromHostHeader is a ReceiverOption for NewMessageReceiver which enables the caller to overwrite the +// default behaviour defined by ParseChannel function. +func ResolveChannelFromHostHeader(hostToChannelFunc ResolveChannelFromHostFunc) ReceiverOptions { + return func(r *MessageReceiver) error { + r.hostToChannelFunc = hostToChannelFunc + return nil + } } // NewMessageReceiver creates a message receiver passing new messages to the // receiverFunc. -func NewMessageReceiver(receiverFunc func(ChannelReference, *Message) error, logger *zap.SugaredLogger) *MessageReceiver { +func NewMessageReceiver(receiverFunc func(ChannelReference, *Message) error, logger *zap.SugaredLogger, opts ...ReceiverOptions) (*MessageReceiver, error) { receiver := &MessageReceiver{ - receiverFunc: receiverFunc, - forwardHeaders: sets.NewString(forwardHeaders...), - forwardPrefixes: forwardPrefixes, - - logger: logger, + receiverFunc: receiverFunc, + forwardHeaders: sets.NewString(forwardHeaders...), + forwardPrefixes: forwardPrefixes, + hostToChannelFunc: ResolveChannelFromHostFunc(ParseChannel), + logger: logger, + } + for _, opt := range opts { + if err := opt(receiver); err != nil { + return nil, err + } } - return receiver + return receiver, nil } // Start begings to receive messages for the receiver. @@ -116,13 +137,13 @@ func (r *MessageReceiver) handler() http.Handler { func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Request) { host := req.Host r.logger.Infof("Received request for %s", host) - channel, err := ParseChannel(host) + channel, err := r.hostToChannelFunc(host) if err != nil { r.logger.Info("Could not extract channel", zap.Error(err)) res.WriteHeader(http.StatusInternalServerError) return } - + r.logger.Infof("Request mapped to channel: %s", channel.String()) message, err := r.fromRequest(req) if err != nil { res.WriteHeader(http.StatusInternalServerError) diff --git a/pkg/provisioners/message_receiver_test.go b/pkg/provisioners/message_receiver_test.go index 8cd8ca9bfbe..e4ec33a718a 100644 --- a/pkg/provisioners/message_receiver_test.go +++ b/pkg/provisioners/message_receiver_test.go @@ -126,7 +126,10 @@ func TestMessageReceiver_HandleRequest(t *testing.T) { } f := tc.receiverFunc - r := NewMessageReceiver(f, zap.NewNop().Sugar()) + r, err := NewMessageReceiver(f, zap.NewNop().Sugar()) + if err != nil { + t.Fatalf("Error creating new message receiver. Error:%s", err) + } h := r.handler() body := tc.bodyReader diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go index 2fd4ae97f9a..bd67bfe32c8 100644 --- a/pkg/sidecar/fanout/fanout_handler.go +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -69,7 +69,7 @@ type forwardMessage struct { } // NewHandler creates a new fanout.Handler. -func NewHandler(logger *zap.Logger, config Config) *Handler { +func NewHandler(logger *zap.Logger, config Config) (*Handler, error) { handler := &Handler{ logger: logger, config: config, @@ -79,9 +79,12 @@ func NewHandler(logger *zap.Logger, config Config) *Handler { } // The receiver function needs to point back at the handler itself, so set it up after // initialization. - handler.receiver = provisioners.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar()) - - return handler + receiver, err := provisioners.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar()) + if err != nil { + return nil, err + } + handler.receiver = receiver + return handler, nil } func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error { diff --git a/pkg/sidecar/fanout/fanout_handler_test.go b/pkg/sidecar/fanout/fanout_handler_test.go index 03b756ca8d9..95e4752b1c6 100644 --- a/pkg/sidecar/fanout/fanout_handler_test.go +++ b/pkg/sidecar/fanout/fanout_handler_test.go @@ -225,12 +225,19 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { subs = append(subs, sub) } - h := NewHandler(zap.NewNop(), Config{Subscriptions: subs}) + h, err := NewHandler(zap.NewNop(), Config{Subscriptions: subs}) + if err != nil { + t.Fatalf("NewHandler failed. Error:%s", err) + } if tc.asyncHandler { h.config.AsyncHandler = true } if tc.receiverFunc != nil { - h.receiver = provisioners.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar()) + receiver, err := provisioners.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar()) + if err != nil { + t.Fatalf("NewMessageReceiver failed. Error:%s", err) + } + h.receiver = receiver } if tc.timeout != 0 { h.timeout = tc.timeout diff --git a/pkg/sidecar/multichannelfanout/config.go b/pkg/sidecar/multichannelfanout/config.go new file mode 100644 index 00000000000..77f97a2e807 --- /dev/null +++ b/pkg/sidecar/multichannelfanout/config.go @@ -0,0 +1,57 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichannelfanout + +import ( + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/sidecar/fanout" +) + +// Config for a multichannelfanout.Handler. +type Config struct { + // The configuration of each channel in this handler. + ChannelConfigs []ChannelConfig `json:"channelConfigs"` +} + +// ChannelConfig is the configuration for a single Channel. +type ChannelConfig struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + HostName string `json:"hostname"` + FanoutConfig fanout.Config `json:"fanoutConfig"` +} + +// NewConfigFromChannels creates a new Config from the list of channels. +func NewConfigFromChannels(channels []v1alpha1.Channel) *Config { + cc := make([]ChannelConfig, 0) + for _, c := range channels { + channelConfig := ChannelConfig{ + Namespace: c.Namespace, + Name: c.Name, + HostName: c.Status.Address.Hostname, + } + if c.Spec.Subscribable != nil { + channelConfig.FanoutConfig = fanout.Config{ + Subscriptions: c.Spec.Subscribable.Subscribers, + } + } + cc = append(cc, channelConfig) + } + return &Config{ + ChannelConfigs: cc, + } +} diff --git a/pkg/sidecar/multichannelfanout/config_test.go b/pkg/sidecar/multichannelfanout/config_test.go new file mode 100644 index 00000000000..e27048c4f31 --- /dev/null +++ b/pkg/sidecar/multichannelfanout/config_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichannelfanout + +import ( + "testing" + + "github.com/knative/eventing/pkg/sidecar/fanout" + + "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" +) + +func TestNewConfigFromChannels(t *testing.T) { + tests := []struct { + name string + channels []v1alpha1.Channel + expected *Config + }{ + { + name: "empty channels list", + channels: []v1alpha1.Channel{}, + expected: &Config{ + ChannelConfigs: []ChannelConfig{}, + }, + }, { + name: "one channel with no subscribers", + channels: []v1alpha1.Channel{ + makeChannel("chan-1", "ns-1", "a.b.c.d", nil), + }, + expected: &Config{ + ChannelConfigs: []ChannelConfig{ + { + Name: "chan-1", + Namespace: "ns-1", + HostName: "a.b.c.d", + }, + }, + }, + }, { + name: "multiple channels with subscribers", + channels: []v1alpha1.Channel{ + makeChannel("chan-1", "ns-1", "e.f.g.h", makeSubscribable(makeSubscriber("sub1"), makeSubscriber("sub2"))), + makeChannel("chan-2", "ns-2", "i.j.k.l", makeSubscribable(makeSubscriber("sub3"), makeSubscriber("sub4"))), + }, + expected: &Config{ + ChannelConfigs: []ChannelConfig{ + { + Name: "chan-1", + Namespace: "ns-1", + HostName: "e.f.g.h", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + makeSubscriber("sub1"), + makeSubscriber("sub2"), + }, + }, + }, { + Name: "chan-2", + Namespace: "ns-2", + HostName: "i.j.k.l", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + makeSubscriber("sub3"), + makeSubscriber("sub4"), + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := NewConfigFromChannels(test.channels) + if diff := cmp.Diff(test.expected, actual); diff != "" { + t.Fatalf("Unexpected difference (-want +got): %v", diff) + } + }) + } +} + +func makeChannel(name string, namespace string, hostname string, subscribable *eventingduck.Subscribable) v1alpha1.Channel { + c := v1alpha1.Channel{ + Spec: v1alpha1.ChannelSpec{ + Subscribable: subscribable, + }, + Status: v1alpha1.ChannelStatus{ + Address: duckv1alpha1.Addressable{ + Hostname: hostname, + }, + }, + } + c.Name = name + c.Namespace = namespace + return c +} +func makeSubscribable(subsriberSpec ...eventingduck.ChannelSubscriberSpec) *eventingduck.Subscribable { + return &eventingduck.Subscribable{ + Subscribers: subsriberSpec, + } +} + +func makeSubscriber(name string) eventingduck.ChannelSubscriberSpec { + return eventingduck.ChannelSubscriberSpec{ + SubscriberURI: name + "-suburi", + ReplyURI: name + "-replyuri", + } +} diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go index 282a1c0985d..c14cd53725c 100644 --- a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go @@ -34,20 +34,6 @@ import ( "go.uber.org/zap" ) -// Config for a multichannelfanout.Handler. -type Config struct { - // The configuration of each channel in this handler. - ChannelConfigs []ChannelConfig `json:"channelConfigs"` -} - -// ChannelConfig is the configuration for a single Channel. -type ChannelConfig struct { - Namespace string `json:"namespace"` - Name string `json:"name"` - HostName string `json:"hostname"` - FanoutConfig fanout.Config `json:"fanoutConfig"` -} - // makeChannelKeyFromConfig creates the channel key for a given channelConfig. It is a helper around // MakeChannelKey. func makeChannelKeyFromConfig(config ChannelConfig) string { @@ -74,7 +60,11 @@ func NewHandler(logger *zap.Logger, conf Config) (*Handler, error) { for _, cc := range conf.ChannelConfigs { key := makeChannelKeyFromConfig(cc) - handler := fanout.NewHandler(logger, cc.FanoutConfig) + handler, err := fanout.NewHandler(logger, cc.FanoutConfig) + if err != nil { + logger.Error("Failed creating new fanout handler.", zap.Error(err)) + return nil, err + } if _, present := handlers[key]; present { logger.Error("Duplicate channel key", zap.String("channelKey", key)) return nil, fmt.Errorf("duplicate channel key: %v", key)