Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
99fddec
WIP
akashrv Apr 5, 2019
c08546a
Merge branch 'noistio2' of github.com:akashrv/eventing into noistio2
akashrv Apr 5, 2019
c642cea
WIP - In-memory working with E2E tests
akashrv Apr 6, 2019
2a4faae
WIP - remove istio dependency from in-memroy channel
akashrv Apr 9, 2019
bd7ae68
UTs pass, E2E tests pass with in-memory as well as kafka
akashrv Apr 10, 2019
df4487f
fixed uts that failed due to last K8s service change
akashrv Apr 10, 2019
824efe6
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 10, 2019
23ae8b4
Removed unnecessary space from a line
akashrv Apr 10, 2019
a04d6d9
Merge branch 'noistio2' of github.com:akashrv/eventing into noistio2
akashrv Apr 10, 2019
bb7ab3e
dding istio annotation to test POD. This will ve needed when running E2E
akashrv Apr 10, 2019
83e519d
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 10, 2019
c646dcd
Bug fix to set clusterIp of K8s service only when it is not of type E…
akashrv Apr 11, 2019
485f6b3
WIP kafka channel
akashrv Apr 11, 2019
6fd3378
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 11, 2019
37bae81
WIP kafka - UTs and E2E pass
akashrv Apr 12, 2019
feb5e64
Updated code based on PR comments
akashrv Apr 12, 2019
3414ba3
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 12, 2019
c1b8581
WIP
akashrv Apr 15, 2019
d2c831f
Updates based on PR comments
akashrv Apr 15, 2019
d61cd1a
Merge branch 'master' of github.com:knative/eventing into noistio2
akashrv Apr 15, 2019
16a6ffc
Updates based on PR comments
akashrv Apr 15, 2019
67611dc
Fixed UTs
akashrv Apr 15, 2019
2cc8525
Updated VENDOR_LICENSE
akashrv Apr 15, 2019
5b68a3f
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 15, 2019
3b3f16f
WIP. Update fanout sidecar
akashrv Apr 15, 2019
f1904fe
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 15, 2019
f065d22
Merge from upstream master
akashrv Apr 15, 2019
a645dce
UTs pass, ITs passed. COde ready for PR
akashrv Apr 16, 2019
fdc4b57
Update natss to not use ISTIO. UTs and E2E tests pass.
akashrv Apr 16, 2019
09e4dfa
Updates based on PR comments
akashrv Apr 16, 2019
2f83359
Merge branch 'noistiokafka' of github.com:akashrv/eventing into noist…
akashrv Apr 16, 2019
a0d247a
Merge branch 'master' of github.com:knative/eventing into noistionatss
akashrv Apr 16, 2019
eb76bcd
REmoved permission to istio virtual service from controller
akashrv Apr 16, 2019
d1a1bd5
Changes based on PR comments
akashrv Apr 18, 2019
2d52ee9
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 18, 2019
d71fecf
Added back permission that was removed by mistake
akashrv Apr 18, 2019
6f5d4f8
Remove istio references
akashrv Apr 18, 2019
10eeec5
Merge branch 'noistionatss' of github.com:akashrv/eventing into noist…
akashrv Apr 18, 2019
0f66d68
WIP
akashrv Apr 19, 2019
9f53403
Removed one more reference of istio
akashrv Apr 19, 2019
5425625
Merge branch 'noistiokafka' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
85d51ca
Merge branch 'noistionatss' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
98f6ff9
Revert kafka.yaml local change
akashrv Apr 19, 2019
dfd9dd6
Merge branch 'master' of github.com:knative/eventing into noistionatss
akashrv Apr 19, 2019
794d92d
Merge branch 'noistionatss' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
338045e
Revert kafka dispatcher change
akashrv Apr 19, 2019
cd26f6f
Removing Mutex. No need to use Mutex when using atomic value for host…
akashrv Apr 19, 2019
1f018ba
Minor updates based on PR comments
akashrv Apr 20, 2019
b3c1996
Merge branch 'master' of github.com:knative/eventing into noistionatss
akashrv Apr 22, 2019
a87f5f6
Merge branch 'master' of github.com:knative/eventing into noistionatss
akashrv Apr 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions contrib/natss/config/broker/natss.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ spec:
app: nats-streaming
template:
metadata:
annotations:
Comment thread
akashrv marked this conversation as resolved.
sidecar.istio.io/inject: "true"
labels: *labels
spec:
containers:
Expand Down
10 changes: 0 additions & 10 deletions contrib/natss/config/provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ rules:
- watch
- create
- update
- apiGroups:
- networking.istio.io
resources:
- virtualservices
verbs:
- get
- list
- watch
- create
- update

---

Expand Down
11 changes: 0 additions & 11 deletions contrib/natss/pkg/controller/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -65,15 +64,5 @@ func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Cont
logger.Error("Unable to watch K8s Services.", zap.Error(err))
return nil, err
}

// Watch the VirtualServices that are owned by Channels.
err = c.Watch(&source.Kind{
Type: &istiov1alpha3.VirtualService{},
}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true})
if err != nil {
logger.Error("Unable to watch VirtualServices.", zap.Error(err))
return nil, err
}

return c, nil
}
11 changes: 2 additions & 9 deletions contrib/natss/pkg/controller/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,19 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)

// We are syncing two things:
// 1. The K8s Service to talk to this Channel.
// 2. The Istio VirtualService to talk to this Channel.

if c.DeletionTimestamp != nil {
// K8s garbage collection will delete the K8s service and VirtualService for this channel.
// K8s garbage collection will delete the K8s service for this channel.
return nil
}

svc, err := provisioners.CreateK8sService(ctx, r.client, c)
svc, err := provisioners.CreateK8sService(ctx, r.client, c, provisioners.ExternalService(c))
if err != nil {
r.logger.Info("Error creating the Channel's K8s Service", zap.Error(err))
return err
}
c.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace))

_, err = provisioners.CreateVirtualService(ctx, r.client, c, svc)
if err != nil {
r.logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
return err
}

c.Status.MarkProvisioned()
return nil
}
Expand Down
63 changes: 25 additions & 38 deletions contrib/natss/pkg/controller/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
util "github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/reconciler/names"
controllertesting "github.com/knative/eventing/pkg/reconciler/testing"
"github.com/knative/eventing/pkg/utils"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"github.com/knative/pkg/system"
_ "github.com/knative/pkg/system/testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -56,7 +56,6 @@ var (
func init() {
// Add types to scheme
eventingv1alpha1.AddToScheme(scheme.Scheme)
istiov1alpha3.AddToScheme(scheme.Scheme)
}

var testCases = []controllertesting.TestCase{
Expand All @@ -65,12 +64,12 @@ var testCases = []controllertesting.TestCase{
InitialState: []runtime.Object{
makeNewClusterChannelProvisioner(clusterChannelProvisionerName, true),
makeNewChannel(channelName, clusterChannelProvisionerName),
makeVirtualService(),
},
ReconcileKey: fmt.Sprintf("%s/%s", testNS, channelName),
WantResult: reconcile.Result{},
WantPresent: []runtime.Object{
makeNewChannelProvisionedStatus(channelName, clusterChannelProvisionerName),
makeK8sService(channelName, clusterChannelProvisionerName),
},
IgnoreTimes: true,
},
Expand Down Expand Up @@ -214,18 +213,29 @@ func makeNewClusterChannelProvisioner(name string, isReady bool) *eventingv1alph
return clusterChannelProvisioner
}

func makeVirtualService() *istiov1alpha3.VirtualService {
return &istiov1alpha3.VirtualService{
func om(namespace, name string) metav1.ObjectMeta {
return metav1.ObjectMeta{
Namespace: namespace,
Name: name,
SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name),
UID: testUID,
}
}

func makeK8sService(channelName string, clusterChannelProvisionerName string) *corev1.Service {
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: istiov1alpha3.SchemeGroupVersion.String(),
Kind: "VirtualService",
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-channel", testNS),
Namespace: testNS,
GenerateName: fmt.Sprintf("%s-channel-", channelName),
Namespace: testNS,
Labels: map[string]string{
"channel": channelName,
"provisioner": clusterChannelProvisionerName,
provisioners.EventingChannelLabel: channelName,
provisioners.OldEventingChannelLabel: channelName,
provisioners.EventingProvisionerLabel: clusterChannelProvisionerName,
provisioners.OldEventingProvisionerLabel: clusterChannelProvisionerName,
},
OwnerReferences: []metav1.OwnerReference{
{
Expand All @@ -238,32 +248,9 @@ func makeVirtualService() *istiov1alpha3.VirtualService {
},
},
},
Spec: istiov1alpha3.VirtualServiceSpec{
Hosts: []string{
serviceAddress,
fmt.Sprintf("%s.%s.channels.%s", channelName, testNS, utils.GetClusterDomainName()),
},
HTTP: []istiov1alpha3.HTTPRoute{{
Rewrite: &istiov1alpha3.HTTPRewrite{
Authority: fmt.Sprintf("%s.%s.channels.%s", channelName, testNS, utils.GetClusterDomainName()),
},
Route: []istiov1alpha3.HTTPRouteDestination{{
Destination: istiov1alpha3.Destination{
Host: "kafka-provisioner.knative-eventing.svc." + utils.GetClusterDomainName(),
Port: istiov1alpha3.PortSelector{
Number: util.PortNumber,
},
}},
}},
},
Spec: corev1.ServiceSpec{
ExternalName: names.ServiceHostName(fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName), system.Namespace()),
Type: "ExternalName",
},
}
}

func om(namespace, name string) metav1.ObjectMeta {
return metav1.ObjectMeta{
Namespace: namespace,
Name: name,
SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name),
}
}
2 changes: 0 additions & 2 deletions contrib/natss/pkg/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/knative/eventing/contrib/natss/pkg/util"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -48,7 +47,6 @@ func main() {

// Add custom types to this array to get them into the manager's scheme.
eventingv1alpha1.AddToScheme(mgr.GetScheme())
istiov1alpha3.AddToScheme(mgr.GetScheme())

_, err = clusterchannelprovisioner.ProvideController(mgr, util.GetDefaultNatssURL(), util.GetDefaultClusterID(), logger.Desugar())
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions contrib/natss/pkg/dispatcher/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

ccpcontroller "github.com/knative/eventing/contrib/natss/pkg/controller/clusterchannelprovisioner"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/channelwatcher"
)

type reconciler struct {
Expand Down Expand Up @@ -122,5 +123,17 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
r.logger.Error("UpdateSubscriptions() failed: ", zap.Error(err))
return false, err
}

chanList, err := channelwatcher.ListAllChannels(ctx, r.client, r.shouldReconcile)
if err != nil {
r.logger.Error("Error getting channel list", zap.Error(err))
return false, err
}

if err := r.subscriptionsSupervisor.UpdateHostToChannelMap(ctx, chanList); err != nil {
r.logger.Error("Error updating host to channel map", zap.Error(err))
return false, err
}

return false, nil
}
52 changes: 51 additions & 1 deletion contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ limitations under the License.
package dispatcher

import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/knative/eventing/contrib/natss/pkg/stanutil"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/provisioners"
stan "github.com/nats-io/go-nats-streaming"
"go.uber.org/zap"
Expand Down Expand Up @@ -59,6 +62,8 @@ type SubscriptionsSupervisor struct {
natssConnMux sync.Mutex
natssConn *stan.Conn
natssConnInProgress bool

hostToChannelMap atomic.Value
}

// NewDispatcher returns a new SubscriptionsSupervisor.
Expand All @@ -71,11 +76,15 @@ func NewDispatcher(natssURL, clusterID string, logger *zap.Logger) (*Subscriptio
clusterID: clusterID,
subscriptions: make(map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription),
}
receiver, err := provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar())
receiver, err := provisioners.NewMessageReceiver(
createReceiverFunction(d, logger.Sugar()),
logger.Sugar(),
provisioners.ResolveChannelFromHostHeader(provisioners.ResolveChannelFromHostFunc(d.getChannelReferenceFromHost)))
if err != nil {
return nil, err
}
d.receiver = receiver
d.setHostToChannelMap(map[string]provisioners.ChannelReference{})
Comment thread
akashrv marked this conversation as resolved.

return d, nil
}
Expand Down Expand Up @@ -291,3 +300,44 @@ func (s *SubscriptionsSupervisor) unsubscribe(channel provisioners.ChannelRefere
func getSubject(channel provisioners.ChannelReference) string {
return channel.Name + "." + channel.Namespace
}

func (s *SubscriptionsSupervisor) getHostToChannelMap() map[string]provisioners.ChannelReference {
return s.hostToChannelMap.Load().(map[string]provisioners.ChannelReference)
}

func (s *SubscriptionsSupervisor) setHostToChannelMap(hcMap map[string]provisioners.ChannelReference) {
s.hostToChannelMap.Store(hcMap)
}

// UpdateHostToChannelMap will be called from the controller that watches natss channels.
// It will update internal hostToChannelMap which is used to resolve the hostHeader of the
// incoming request to the correct ChannelReference in the receiver function.
func (s *SubscriptionsSupervisor) UpdateHostToChannelMap(ctx context.Context, chanList []eventingv1alpha1.Channel) error {
hostToChanMap := make(map[string]provisioners.ChannelReference, len(chanList))
for _, c := range chanList {
hostName := c.Status.Address.Hostname
if cr, ok := hostToChanMap[hostName]; ok {
return fmt.Errorf(
"Duplicate hostName found. Each channel must have a unique host header. HostName:%s, channel:%s.%s, channel:%s.%s",
hostName,
c.Namespace,
c.Name,
cr.Namespace,
cr.Name)
}
hostToChanMap[hostName] = provisioners.ChannelReference{Name: c.Name, Namespace: c.Namespace}
}

s.setHostToChannelMap(hostToChanMap)
logging.FromContext(ctx).Info("hostToChannelMap updated successfully.")
return nil
}

func (s *SubscriptionsSupervisor) getChannelReferenceFromHost(host string) (provisioners.ChannelReference, error) {
chMap := s.getHostToChannelMap()
cr, ok := chMap[host]
if !ok {
return cr, fmt.Errorf("Invalid HostName:%q. HostName not found in any of the watched natss channels", host)
}
return cr, nil
}
Loading