diff --git a/contrib/natss/config/broker/natss.yaml b/contrib/natss/config/broker/natss.yaml index df040e07b98..2f1bb433fbd 100644 --- a/contrib/natss/config/broker/natss.yaml +++ b/contrib/natss/config/broker/natss.yaml @@ -62,8 +62,6 @@ spec: app: nats-streaming template: metadata: - annotations: - sidecar.istio.io/inject: "true" labels: *labels spec: containers: diff --git a/contrib/natss/config/provisioner.yaml b/contrib/natss/config/provisioner.yaml index 681146deca6..6790bbe7b1d 100644 --- a/contrib/natss/config/provisioner.yaml +++ b/contrib/natss/config/provisioner.yaml @@ -55,16 +55,6 @@ rules: - watch - create - update - - apiGroups: - - networking.istio.io - resources: - - virtualservices - verbs: - - get - - list - - watch - - create - - update --- diff --git a/contrib/natss/pkg/controller/channel/controller.go b/contrib/natss/pkg/controller/channel/controller.go index 9968eedf9e2..5427bb180c1 100644 --- a/contrib/natss/pkg/controller/channel/controller.go +++ b/contrib/natss/pkg/controller/channel/controller.go @@ -24,7 +24,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" corev1 "k8s.io/api/core/v1" ) @@ -65,15 +64,5 @@ func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Cont logger.Error("Unable to watch K8s Services.", zap.Error(err)) return nil, err } - - // Watch the VirtualServices that are owned by Channels. - err = c.Watch(&source.Kind{ - Type: &istiov1alpha3.VirtualService{}, - }, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true}) - if err != nil { - logger.Error("Unable to watch VirtualServices.", zap.Error(err)) - return nil, err - } - return c, nil } diff --git a/contrib/natss/pkg/controller/channel/reconcile.go b/contrib/natss/pkg/controller/channel/reconcile.go index 4eb9445bccd..c0f03f563d5 100644 --- a/contrib/natss/pkg/controller/channel/reconcile.go +++ b/contrib/natss/pkg/controller/channel/reconcile.go @@ -115,26 +115,19 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) // We are syncing two things: // 1. The K8s Service to talk to this Channel. - // 2. The Istio VirtualService to talk to this Channel. if c.DeletionTimestamp != nil { - // K8s garbage collection will delete the K8s service and VirtualService for this channel. + // K8s garbage collection will delete the K8s service for this channel. return nil } - svc, err := provisioners.CreateK8sService(ctx, r.client, c) + svc, err := provisioners.CreateK8sService(ctx, r.client, c, provisioners.ExternalService(c)) if err != nil { r.logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) return err } c.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace)) - _, err = provisioners.CreateVirtualService(ctx, r.client, c, svc) - if err != nil { - r.logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) - return err - } - c.Status.MarkProvisioned() return nil } diff --git a/contrib/natss/pkg/controller/channel/reconcile_test.go b/contrib/natss/pkg/controller/channel/reconcile_test.go index 6c4cbaf144e..5ba77cad8aa 100644 --- a/contrib/natss/pkg/controller/channel/reconcile_test.go +++ b/contrib/natss/pkg/controller/channel/reconcile_test.go @@ -22,11 +22,11 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" - util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/reconciler/names" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" "github.com/knative/eventing/pkg/utils" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "github.com/knative/pkg/system" _ "github.com/knative/pkg/system/testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -56,7 +56,6 @@ var ( func init() { // Add types to scheme eventingv1alpha1.AddToScheme(scheme.Scheme) - istiov1alpha3.AddToScheme(scheme.Scheme) } var testCases = []controllertesting.TestCase{ @@ -65,12 +64,12 @@ var testCases = []controllertesting.TestCase{ InitialState: []runtime.Object{ makeNewClusterChannelProvisioner(clusterChannelProvisionerName, true), makeNewChannel(channelName, clusterChannelProvisionerName), - makeVirtualService(), }, ReconcileKey: fmt.Sprintf("%s/%s", testNS, channelName), WantResult: reconcile.Result{}, WantPresent: []runtime.Object{ makeNewChannelProvisionedStatus(channelName, clusterChannelProvisionerName), + makeK8sService(channelName, clusterChannelProvisionerName), }, IgnoreTimes: true, }, @@ -214,18 +213,29 @@ func makeNewClusterChannelProvisioner(name string, isReady bool) *eventingv1alph return clusterChannelProvisioner } -func makeVirtualService() *istiov1alpha3.VirtualService { - return &istiov1alpha3.VirtualService{ +func om(namespace, name string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name), + UID: testUID, + } +} + +func makeK8sService(channelName string, clusterChannelProvisionerName string) *corev1.Service { + return &corev1.Service{ TypeMeta: metav1.TypeMeta{ - APIVersion: istiov1alpha3.SchemeGroupVersion.String(), - Kind: "VirtualService", + APIVersion: "v1", + Kind: "Service", }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-channel", testNS), - Namespace: testNS, + GenerateName: fmt.Sprintf("%s-channel-", channelName), + Namespace: testNS, Labels: map[string]string{ - "channel": channelName, - "provisioner": clusterChannelProvisionerName, + provisioners.EventingChannelLabel: channelName, + provisioners.OldEventingChannelLabel: channelName, + provisioners.EventingProvisionerLabel: clusterChannelProvisionerName, + provisioners.OldEventingProvisionerLabel: clusterChannelProvisionerName, }, OwnerReferences: []metav1.OwnerReference{ { @@ -238,32 +248,9 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, }, }, - Spec: istiov1alpha3.VirtualServiceSpec{ - Hosts: []string{ - serviceAddress, - fmt.Sprintf("%s.%s.channels.%s", channelName, testNS, utils.GetClusterDomainName()), - }, - HTTP: []istiov1alpha3.HTTPRoute{{ - Rewrite: &istiov1alpha3.HTTPRewrite{ - Authority: fmt.Sprintf("%s.%s.channels.%s", channelName, testNS, utils.GetClusterDomainName()), - }, - Route: []istiov1alpha3.HTTPRouteDestination{{ - Destination: istiov1alpha3.Destination{ - Host: "kafka-provisioner.knative-eventing.svc." + utils.GetClusterDomainName(), - Port: istiov1alpha3.PortSelector{ - Number: util.PortNumber, - }, - }}, - }}, - }, + Spec: corev1.ServiceSpec{ + ExternalName: names.ServiceHostName(fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName), system.Namespace()), + Type: "ExternalName", }, } } - -func om(namespace, name string) metav1.ObjectMeta { - return metav1.ObjectMeta{ - Namespace: namespace, - Name: name, - SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name), - } -} diff --git a/contrib/natss/pkg/controller/main.go b/contrib/natss/pkg/controller/main.go index 531e8901f7c..26cb9f6047a 100644 --- a/contrib/natss/pkg/controller/main.go +++ b/contrib/natss/pkg/controller/main.go @@ -24,7 +24,6 @@ import ( "github.com/knative/eventing/contrib/natss/pkg/util" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "github.com/knative/pkg/signals" "go.uber.org/zap" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -48,7 +47,6 @@ func main() { // Add custom types to this array to get them into the manager's scheme. eventingv1alpha1.AddToScheme(mgr.GetScheme()) - istiov1alpha3.AddToScheme(mgr.GetScheme()) _, err = clusterchannelprovisioner.ProvideController(mgr, util.GetDefaultNatssURL(), util.GetDefaultClusterID(), logger.Desugar()) if err != nil { diff --git a/contrib/natss/pkg/dispatcher/channel/reconcile.go b/contrib/natss/pkg/dispatcher/channel/reconcile.go index dda10e27f59..f9af9763321 100644 --- a/contrib/natss/pkg/dispatcher/channel/reconcile.go +++ b/contrib/natss/pkg/dispatcher/channel/reconcile.go @@ -29,6 +29,7 @@ import ( ccpcontroller "github.com/knative/eventing/contrib/natss/pkg/controller/clusterchannelprovisioner" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/channelwatcher" ) type reconciler struct { @@ -122,5 +123,17 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) r.logger.Error("UpdateSubscriptions() failed: ", zap.Error(err)) return false, err } + + chanList, err := channelwatcher.ListAllChannels(ctx, r.client, r.shouldReconcile) + if err != nil { + r.logger.Error("Error getting channel list", zap.Error(err)) + return false, err + } + + if err := r.subscriptionsSupervisor.UpdateHostToChannelMap(ctx, chanList); err != nil { + r.logger.Error("Error updating host to channel map", zap.Error(err)) + return false, err + } + return false, nil } diff --git a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go index 28a0b9ad7de..ecbb7d8211f 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go +++ b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go @@ -17,12 +17,15 @@ limitations under the License. package dispatcher import ( + "context" "encoding/json" "fmt" "sync" + "sync/atomic" "time" "github.com/knative/eventing/contrib/natss/pkg/stanutil" + "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners" stan "github.com/nats-io/go-nats-streaming" "go.uber.org/zap" @@ -59,6 +62,8 @@ type SubscriptionsSupervisor struct { natssConnMux sync.Mutex natssConn *stan.Conn natssConnInProgress bool + + hostToChannelMap atomic.Value } // NewDispatcher returns a new SubscriptionsSupervisor. @@ -71,11 +76,15 @@ func NewDispatcher(natssURL, clusterID string, logger *zap.Logger) (*Subscriptio clusterID: clusterID, subscriptions: make(map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription), } - receiver, err := provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar()) + receiver, err := provisioners.NewMessageReceiver( + createReceiverFunction(d, logger.Sugar()), + logger.Sugar(), + provisioners.ResolveChannelFromHostHeader(provisioners.ResolveChannelFromHostFunc(d.getChannelReferenceFromHost))) if err != nil { return nil, err } d.receiver = receiver + d.setHostToChannelMap(map[string]provisioners.ChannelReference{}) return d, nil } @@ -291,3 +300,44 @@ func (s *SubscriptionsSupervisor) unsubscribe(channel provisioners.ChannelRefere func getSubject(channel provisioners.ChannelReference) string { return channel.Name + "." + channel.Namespace } + +func (s *SubscriptionsSupervisor) getHostToChannelMap() map[string]provisioners.ChannelReference { + return s.hostToChannelMap.Load().(map[string]provisioners.ChannelReference) +} + +func (s *SubscriptionsSupervisor) setHostToChannelMap(hcMap map[string]provisioners.ChannelReference) { + s.hostToChannelMap.Store(hcMap) +} + +// UpdateHostToChannelMap will be called from the controller that watches natss channels. +// It will update internal hostToChannelMap which is used to resolve the hostHeader of the +// incoming request to the correct ChannelReference in the receiver function. +func (s *SubscriptionsSupervisor) UpdateHostToChannelMap(ctx context.Context, chanList []eventingv1alpha1.Channel) error { + hostToChanMap := make(map[string]provisioners.ChannelReference, len(chanList)) + for _, c := range chanList { + hostName := c.Status.Address.Hostname + if cr, ok := hostToChanMap[hostName]; ok { + return fmt.Errorf( + "Duplicate hostName found. Each channel must have a unique host header. HostName:%s, channel:%s.%s, channel:%s.%s", + hostName, + c.Namespace, + c.Name, + cr.Namespace, + cr.Name) + } + hostToChanMap[hostName] = provisioners.ChannelReference{Name: c.Name, Namespace: c.Namespace} + } + + s.setHostToChannelMap(hostToChanMap) + logging.FromContext(ctx).Info("hostToChannelMap updated successfully.") + return nil +} + +func (s *SubscriptionsSupervisor) getChannelReferenceFromHost(host string) (provisioners.ChannelReference, error) { + chMap := s.getHostToChannelMap() + cr, ok := chMap[host] + if !ok { + return cr, fmt.Errorf("Invalid HostName:%q. HostName not found in any of the watched natss channels", host) + } + return cr, nil +} diff --git a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go index 54dab1b13b1..5df19d94d44 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go +++ b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go @@ -17,11 +17,13 @@ limitations under the License. package dispatcher import ( + "context" "encoding/json" "os" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/knative/eventing/contrib/natss/pkg/stanutil" "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -199,6 +201,68 @@ func TestUpdateSubscriptions(t *testing.T) { } } +func TestUpdateHostToChannelMap(t *testing.T) { + tests := []struct { + name string + chanList []eventingv1alpha1.Channel + expected map[string]provisioners.ChannelReference + expectedErrorString string + }{ + { + name: "Empty channel list", + expected: map[string]provisioners.ChannelReference{}, + }, { + name: "Duplicate host name", + chanList: []eventingv1alpha1.Channel{ + *makechannel("chan1", "ns1", "host1"), + *makechannel("chan2", "ns2", "host2"), + *makechannel("chan3", "ns3", "host2"), + }, + expected: map[string]provisioners.ChannelReference{}, + expectedErrorString: "Duplicate hostName found. Each channel must have a unique host header. HostName:host2, channel:ns3.chan3, channel:ns2.chan2", + }, { + name: "Valid list of channels", + chanList: []eventingv1alpha1.Channel{ + *makechannel("chan1", "ns1", "host1"), + *makechannel("chan2", "ns2", "host2"), + *makechannel("chan3", "ns3", "host3"), + }, + expected: map[string]provisioners.ChannelReference{ + "host1": provisioners.ChannelReference{Name: "chan1", Namespace: "ns1"}, + "host2": provisioners.ChannelReference{Name: "chan2", Namespace: "ns2"}, + "host3": provisioners.ChannelReference{Name: "chan3", Namespace: "ns3"}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s.setHostToChannelMap(map[string]provisioners.ChannelReference{}) + err := s.UpdateHostToChannelMap(context.TODO(), test.chanList) + + if err != nil { + if diff := cmp.Diff(test.expectedErrorString, err.Error()); diff != "" { + t.Fatalf("Unexpected difference (-want +got): %v", diff) + } + } + + if diff := cmp.Diff(test.expected, s.getHostToChannelMap()); diff != "" { + t.Fatalf("Unexpected difference (-want +got): %v", diff) + } + }) + } +} + +func makechannel(name string, namespace string, hostname string) *eventingv1alpha1.Channel { + c := eventingv1alpha1.Channel{} + c.Name = name + c.Namespace = namespace + c.Status.InitializeConditions() + c.Status.MarkProvisioned() + c.Status.MarkProvisionerInstalled() + c.Status.SetAddress(hostname) + return &c +} + func startNatss() (*server.StanServer, error) { logger.Infof("Start NATSS") var ( diff --git a/pkg/channelwatcher/channel_watcher.go b/pkg/channelwatcher/channel_watcher.go index 26b77b362b1..249fe2ef439 100644 --- a/pkg/channelwatcher/channel_watcher.go +++ b/pkg/channelwatcher/channel_watcher.go @@ -87,7 +87,7 @@ type ShouldWatchFunc func(ch *v1alpha1.Channel) bool // This is used by dispatchers or receivers to update their configs by watching channels. func UpdateConfigWatchHandler(updateConfig swappable.UpdateConfig, shouldWatch ShouldWatchFunc) WatchHandlerFunc { return func(ctx context.Context, c client.Client, _ types.NamespacedName) error { - channels, err := listAllChannels(ctx, c, shouldWatch) + channels, err := ListAllChannels(ctx, c, shouldWatch) if err != nil { logging.FromContext(ctx).Info("Unable to list channels", zap.Error(err)) return err @@ -97,8 +97,8 @@ func UpdateConfigWatchHandler(updateConfig swappable.UpdateConfig, shouldWatch S } } -// listAllChannels queries client and gets list of all channels for which shouldWatch returns true. -func listAllChannels(ctx context.Context, c client.Client, shouldWatch ShouldWatchFunc) ([]v1alpha1.Channel, error) { +// ListAllChannels queries client and gets list of all channels for which shouldWatch returns true. +func ListAllChannels(ctx context.Context, c client.Client, shouldWatch ShouldWatchFunc) ([]v1alpha1.Channel, error) { channels := make([]v1alpha1.Channel, 0) cl := &v1alpha1.ChannelList{} if err := c.List(ctx, &client.ListOptions{}, cl); err != nil {