Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
99fddec
WIP
akashrv Apr 5, 2019
c08546a
Merge branch 'noistio2' of github.com:akashrv/eventing into noistio2
akashrv Apr 5, 2019
c642cea
WIP - In-memory working with E2E tests
akashrv Apr 6, 2019
2a4faae
WIP - remove istio dependency from in-memroy channel
akashrv Apr 9, 2019
bd7ae68
UTs pass, E2E tests pass with in-memory as well as kafka
akashrv Apr 10, 2019
df4487f
fixed uts that failed due to last K8s service change
akashrv Apr 10, 2019
824efe6
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 10, 2019
23ae8b4
Removed unnecessary space from a line
akashrv Apr 10, 2019
a04d6d9
Merge branch 'noistio2' of github.com:akashrv/eventing into noistio2
akashrv Apr 10, 2019
bb7ab3e
dding istio annotation to test POD. This will ve needed when running E2E
akashrv Apr 10, 2019
83e519d
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 10, 2019
c646dcd
Bug fix to set clusterIp of K8s service only when it is not of type E…
akashrv Apr 11, 2019
485f6b3
WIP kafka channel
akashrv Apr 11, 2019
6fd3378
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 11, 2019
37bae81
WIP kafka - UTs and E2E pass
akashrv Apr 12, 2019
feb5e64
Updated code based on PR comments
akashrv Apr 12, 2019
3414ba3
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 12, 2019
c1b8581
WIP
akashrv Apr 15, 2019
d2c831f
Updates based on PR comments
akashrv Apr 15, 2019
d61cd1a
Merge branch 'master' of github.com:knative/eventing into noistio2
akashrv Apr 15, 2019
16a6ffc
Updates based on PR comments
akashrv Apr 15, 2019
67611dc
Fixed UTs
akashrv Apr 15, 2019
2cc8525
Updated VENDOR_LICENSE
akashrv Apr 15, 2019
5b68a3f
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 15, 2019
3b3f16f
WIP. Update fanout sidecar
akashrv Apr 15, 2019
f1904fe
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 15, 2019
f065d22
Merge from upstream master
akashrv Apr 15, 2019
a645dce
UTs pass, ITs passed. COde ready for PR
akashrv Apr 16, 2019
fdc4b57
Update natss to not use ISTIO. UTs and E2E tests pass.
akashrv Apr 16, 2019
09e4dfa
Updates based on PR comments
akashrv Apr 16, 2019
2f83359
Merge branch 'noistiokafka' of github.com:akashrv/eventing into noist…
akashrv Apr 16, 2019
a0d247a
Merge branch 'master' of github.com:knative/eventing into noistionatss
akashrv Apr 16, 2019
eb76bcd
REmoved permission to istio virtual service from controller
akashrv Apr 16, 2019
1de9e38
WIP
akashrv Apr 17, 2019
d1a1bd5
Changes based on PR comments
akashrv Apr 18, 2019
2d52ee9
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 18, 2019
d71fecf
Added back permission that was removed by mistake
akashrv Apr 18, 2019
d41ecad
Merge branch 'noistiogcp' of github.com:akashrv/eventing into noistiogcp
akashrv Apr 18, 2019
e3e175c
WIP
akashrv Apr 18, 2019
9adebd7
Merge branch 'noistiokafka' of github.com:akashrv/eventing into noist…
akashrv Apr 18, 2019
6f5d4f8
Remove istio references
akashrv Apr 18, 2019
10eeec5
Merge branch 'noistionatss' of github.com:akashrv/eventing into noist…
akashrv Apr 18, 2019
0f66d68
WIP
akashrv Apr 19, 2019
9f53403
Removed one more reference of istio
akashrv Apr 19, 2019
5425625
Merge branch 'noistiokafka' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
6ce84ee
Merge branch 'noistionatss' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
85d51ca
Merge branch 'noistionatss' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
98f6ff9
Revert kafka.yaml local change
akashrv Apr 19, 2019
07995dd
WIP
akashrv Apr 19, 2019
dfd9dd6
Merge branch 'master' of github.com:knative/eventing into noistionatss
akashrv Apr 19, 2019
794d92d
Merge branch 'noistionatss' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
338045e
Revert kafka dispatcher change
akashrv Apr 19, 2019
cd26f6f
Removing Mutex. No need to use Mutex when using atomic value for host…
akashrv Apr 19, 2019
d0514ca
Merge branch 'master' of github.com:knative/eventing into noistiogcp
akashrv Apr 19, 2019
cdf44a0
Merge branch 'noistionatss' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
9372a11
Merge branch 'master' of github.com:knative/eventing into noistiogcp
akashrv Apr 22, 2019
d98920a
Removed named port from GCP dispatcher K8s service
akashrv Apr 22, 2019
4b1513c
Merge branch 'master' of github.com:knative/eventing into noistiogcp
akashrv Apr 23, 2019
34e8fd5
Merge branch 'master' of github.com:knative/eventing into noistiogcp
akashrv Apr 24, 2019
dae0877
Merge branch 'master' of github.com:knative/eventing into noistiogcp
akashrv Apr 25, 2019
7a45499
Merge branch 'master' of github.com:knative/eventing into noistiogcp
akashrv Apr 25, 2019
8c47946
WIP
akashrv Apr 25, 2019
34aa1ba
FInal changes before validating E2E tests
akashrv Apr 25, 2019
d799c3a
Merge branch 'master' of github.com:knative/eventing into noistiogcp
akashrv Apr 25, 2019
9724578
Updates based on PR comments
akashrv Apr 26, 2019
4d58944
Merge branch 'master' of github.com:knative/eventing into noistiogcp
akashrv Apr 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions config/200-controller-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ rules:
- "patch"
- "watch"

# Channels and Triggers both manipulate VirtualServices.
- apiGroups:
- "networking.istio.io"
resources:
- "virtualservices"
verbs: *everything

# Brokers and the namespace annotation controllers manipulate Deployments.
- apiGroups:
- "apps"
Expand Down
17 changes: 2 additions & 15 deletions contrib/gcppubsub/config/gcppubsub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,6 @@ rules:
verbs:
- create
- update
- apiGroups:
- networking.istio.io
resources:
- virtualservices
verbs:
- get
- list
- watch
- create
- update
- apiGroups:
- "" # Core API Group.
resources:
Expand Down Expand Up @@ -240,15 +230,12 @@ spec:
clusterChannelProvisioner: gcp-pubsub
role: dispatcher
ports:
- name: http
protocol: TCP
- protocol: TCP
port: 80
targetPort: 8080

---

# Needed by the GCP PubSub Channel to communicate with GCP PubSub.
Comment thread
akashrv marked this conversation as resolved.

# Needed by the GCP PubSub Channel to communicate with GCP PubSub.
apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
Expand Down
10 changes: 0 additions & 10 deletions contrib/gcppubsub/pkg/controller/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package channel
import (
pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -74,15 +73,6 @@ func ProvideController(defaultGcpProject string, defaultSecret *corev1.ObjectRef
return nil, err
}

// Watch the VirtualServices that are owned by Channels.
err = c.Watch(&source.Kind{
Type: &istiov1alpha3.VirtualService{},
}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true})
if err != nil {
logger.Error("Unable to watch VirtualServices.", zap.Error(err))
return nil, err
}

return c, nil
}
}
63 changes: 23 additions & 40 deletions contrib/gcppubsub/pkg/controller/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package channel
import (
"context"
"fmt"
"github.com/knative/eventing/pkg/apis/duck/v1alpha1"

ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner"
pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
"github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/logging"
util "github.com/knative/eventing/pkg/provisioners"
Expand All @@ -49,21 +49,20 @@ const (
noNeedToPersist

// Name of the corev1.Events emitted from the reconciliation process
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
channelReadStatusFailed = "ChannelReadStatusFailed"
gcpCredentialsReadFailed = "GcpCredentialsReadFailed"
gcpResourcesPlanFailed = "GcpResourcesPlanFailed"
gcpResourcesPersistFailed = "GcpResourcesPersistFailed"
virtualServiceCreateFailed = "VirtualServiceCreateFailed"
k8sServiceCreateFailed = "K8sServiceCreateFailed"
topicCreateFailed = "TopicCreateFailed"
topicDeleteFailed = "TopicDeleteFailed"
subscriptionSyncFailed = "SubscriptionSyncFailed"
subscriptionDeleteFailed = "SubscriptionDeleteFailed"
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
channelReadStatusFailed = "ChannelReadStatusFailed"
gcpCredentialsReadFailed = "GcpCredentialsReadFailed"
gcpResourcesPlanFailed = "GcpResourcesPlanFailed"
gcpResourcesPersistFailed = "GcpResourcesPersistFailed"
k8sServiceCreateFailed = "K8sServiceCreateFailed"
topicCreateFailed = "TopicCreateFailed"
topicDeleteFailed = "TopicDeleteFailed"
subscriptionSyncFailed = "SubscriptionSyncFailed"
subscriptionDeleteFailed = "SubscriptionDeleteFailed"
)

// reconciler reconciles GCP-PubSub Channels by creating the K8s Service and Istio VirtualService
// reconciler reconciles GCP-PubSub Channels by creating the K8s Service (ExternalName)
// allowing other processes to send data to them. It also creates the GCP PubSub Topics (one per
// Channel) and GCP PubSub Subscriptions (one per Subscriber).
type reconciler struct {
Expand Down Expand Up @@ -116,7 +115,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
}

// Does this Controller control this Channel?
if !r.shouldReconcile(c) {
if !ShouldReconcile(c) {
logging.FromContext(ctx).Info("Not reconciling Channel, it is not controlled by this Controller", zap.Any("ref", c.Spec))
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -147,9 +146,9 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
}, reconcileErr
}

// shouldReconcile determines if this Controller should control (and therefore reconcile) a given
// ShouldReconcile determines if this Controller should control (and therefore reconcile) a given
// Channel. This Controller only handles gcp-pubsub channels.
func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool {
func ShouldReconcile(c *eventingv1alpha1.Channel) bool {
if c.Spec.Provisioner != nil {
return ccpcontroller.IsControlled(c.Spec.Provisioner)
}
Expand All @@ -162,11 +161,10 @@ func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool {
func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) (bool, error) {
c.Status.InitializeConditions()

// We are syncing four things:
// 1. The K8s Service to talk to this Channel.
// 2. The Istio VirtualService to talk to this Channel.
// 3. The GCP PubSub Topic (one for the Channel).
// 4. The GCP PubSub Subscriptions (one for each Subscriber of the Channel).
// We are syncing the following:
// - The K8s Service to talk to this Channel.
// - The GCP PubSub Topic (one for the Channel).
// - The GCP PubSub Subscriptions (one for each Subscriber of the Channel).

// First we will plan all the names out for steps 3 and 4 persist them to status.internal. Then, on a
// subsequent reconcile, we manipulate all the GCP resources in steps 3 and 4.
Expand All @@ -187,7 +185,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
}

if c.DeletionTimestamp != nil {
// K8s garbage collection will delete the K8s service and VirtualService for this channel.
// K8s garbage collection will delete the K8s service for this channel.
// All the subs should be deleted.
subsToSync := &syncSubs{
subsToDelete: originalPCS.Subscriptions,
Expand Down Expand Up @@ -232,18 +230,12 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
return true, nil
}

svc, err := r.createK8sService(ctx, c)
_, err = r.createK8sService(ctx, c)
if err != nil {
r.recorder.Eventf(c, corev1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err)
return false, err
}

err = r.createVirtualService(ctx, c, svc)
if err != nil {
r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err)
return false, err
}

topic, err := r.createTopic(ctx, plannedPCS, gcpCreds)
if err != nil {
r.recorder.Eventf(c, corev1.EventTypeWarning, topicCreateFailed, "Failed to reconcile Topic for the Channel: %v", err)
Expand Down Expand Up @@ -360,7 +352,7 @@ func (r *reconciler) planGcpResources(ctx context.Context, c *eventingv1alpha1.C
}

func (r *reconciler) createK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) {
svc, err := util.CreateK8sService(ctx, r.client, c)
svc, err := util.CreateK8sService(ctx, r.client, c, util.ExternalService(c))
if err != nil {
logging.FromContext(ctx).Info("Error creating the Channel's K8s Service", zap.Error(err))
return nil, err
Expand All @@ -370,15 +362,6 @@ func (r *reconciler) createK8sService(ctx context.Context, c *eventingv1alpha1.C
return svc, nil
}

func (r *reconciler) createVirtualService(ctx context.Context, c *eventingv1alpha1.Channel, svc *corev1.Service) error {
_, err := util.CreateVirtualService(ctx, r.client, c, svc)
if err != nil {
logging.FromContext(ctx).Info("Error creating the Virtual Service for the Channel", zap.Error(err))
return err
}
return nil
}

func (r *reconciler) createTopic(ctx context.Context, plannedPCS *pubsubutil.GcpPubSubChannelStatus, gcpCreds *google.Credentials) (pubsubutil.PubSubTopic, error) {
psc, err := r.pubSubClientCreator(ctx, gcpCreds, plannedPCS.GCPProject)
if err != nil {
Expand Down
Loading