Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions contrib/gcppubsub/config/gcppubsub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ rules:
- watch
- create
- update
- apiGroups:
- "" # Core API Group.
resources:
- events
verbs:
- create
- patch
- update

---

Expand Down
31 changes: 30 additions & 1 deletion contrib/gcppubsub/pkg/controller/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
"github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/reconciler/names"
util "github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/reconciler/names"
"go.uber.org/zap"
"golang.org/x/oauth2/google"
v1 "k8s.io/api/core/v1"
Expand All @@ -46,6 +46,20 @@ type persistence int
const (
persistStatus persistence = iota
noNeedToPersist

// Name of the corev1.Events emitted from the reconciliation process
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
channelReadStatusFailed = "ChannelReadStatusFailed"
gcpCredentialsReadFailed = "GcpCredentialsReadFailed"
gcpResourcesPlanFailed = "GcpResourcesPlanFailed"
gcpResourcesPersistFailed = "GcpResourcesPersistFailed"
virtualServiceCreateFailed = "VirtualServiceCreateFailed"
k8sServiceCreateFailed = "K8sServiceCreateFailed"
topicCreateFailed = "TopicCreateFailed"
topicDeleteFailed = "TopicDeleteFailed"
subscriptionSyncFailed = "SubscriptionSyncFailed"
subscriptionDeleteFailed = "SubscriptionDeleteFailed"
)

// reconciler reconciles GCP-PubSub Channels by creating the K8s Service and Istio VirtualService
Expand Down Expand Up @@ -116,10 +130,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
logging.FromContext(ctx).Info("Error reconciling Channel", zap.Error(reconcileErr))
// Note that we do not return the error here, because we want to update the Status
// regardless of the error.
} else {
logging.FromContext(ctx).Info("Channel reconciled")
r.recorder.Eventf(c, v1.EventTypeNormal, channelReconciled, "Channel reconciled: %q", c.Name)
}

if err = util.UpdateChannel(ctx, r.client, c); err != nil {
logging.FromContext(ctx).Info("Error updating Channel Status", zap.Error(err))
r.recorder.Eventf(c, v1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err)
return reconcile.Result{}, err
}

Expand Down Expand Up @@ -155,13 +173,15 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
originalPCS, err := pubsubutil.GetInternalStatus(c)
if err != nil {
logging.FromContext(ctx).Error("Unable to read the status.internal", zap.Error(err))
r.recorder.Eventf(c, v1.EventTypeWarning, channelReadStatusFailed, "Failed to read Channel's status.internal: %v", err)
return false, err
}

// Regardless of what we are going to do, we need GCP credentials to do it.
gcpCreds, err := pubsubutil.GetCredentials(ctx, r.client, r.defaultSecret, r.defaultSecretKey)
if err != nil {
logging.FromContext(ctx).Info("Unable to generate GCP creds", zap.Error(err))
r.recorder.Eventf(c, v1.EventTypeWarning, gcpCredentialsReadFailed, "Failed to generate GCP credentials: %v", err)
return false, err
}

Expand All @@ -174,10 +194,12 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
// Topic is nil because it is only used for sub creation, not deletion.
err = r.syncSubscriptions(ctx, originalPCS, gcpCreds, nil, subsToSync)
if err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, subscriptionSyncFailed, "Failed to sync Subscription for the Channel: %v", err)
return false, err
}
err = r.deleteTopic(ctx, originalPCS, gcpCreds)
if err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, topicDeleteFailed, "Failed to delete Topic for the Channel: %v", err)
return false, err
}
util.RemoveFinalizer(c, finalizerName)
Expand All @@ -197,10 +219,12 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
// only at the status, not the spec.
persist, plannedPCS, subsToSync, err := r.planGcpResources(ctx, c, originalPCS)
if err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, gcpResourcesPlanFailed, "Failed to plan Channel's resources: %v", err)
return false, err
}
if persist == persistStatus {
if err = pubsubutil.SetInternalStatus(ctx, c, plannedPCS); err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, gcpResourcesPersistFailed, "Failed to persist Channel's resources: %v", err)
return false, err
}
// Persist this and run another reconcile loop to enact it.
Expand All @@ -209,26 +233,31 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)

svc, err := r.createK8sService(ctx, c)
if err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err)
return false, err
}

err = r.createVirtualService(ctx, c, svc)
if err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err)
return false, err
}

topic, err := r.createTopic(ctx, plannedPCS, gcpCreds)
if err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, topicCreateFailed, "Failed to reconcile Topic for the Channel: %v", err)
return false, err
}

err = r.syncSubscriptions(ctx, plannedPCS, gcpCreds, topic, subsToSync)
if err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, subscriptionSyncFailed, "Failed to reconcile Subscription for the Channel: %v", err)
return false, err
}
// Now that the subs have synced successfully, remove the old ones from the status.
plannedPCS.Subscriptions = subsToSync.subsToCreate
if err = pubsubutil.SetInternalStatus(ctx, c, plannedPCS); err != nil {
r.recorder.Eventf(c, v1.EventTypeWarning, subscriptionDeleteFailed, "Failed to delete old Subscriptions from the Channel's status: %v", err)
return false, err
}

Expand Down
Loading