diff --git a/contrib/gcppubsub/config/gcppubsub.yaml b/contrib/gcppubsub/config/gcppubsub.yaml index 4dcb04899ac..02359d6d10c 100644 --- a/contrib/gcppubsub/config/gcppubsub.yaml +++ b/contrib/gcppubsub/config/gcppubsub.yaml @@ -71,6 +71,14 @@ rules: - watch - create - update + - apiGroups: + - "" # Core API Group. + resources: + - events + verbs: + - create + - patch + - update --- diff --git a/contrib/gcppubsub/pkg/controller/channel/reconcile.go b/contrib/gcppubsub/pkg/controller/channel/reconcile.go index 63eccb1ca7c..f1f6c384575 100644 --- a/contrib/gcppubsub/pkg/controller/channel/reconcile.go +++ b/contrib/gcppubsub/pkg/controller/channel/reconcile.go @@ -24,8 +24,8 @@ import ( pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" "github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/reconciler/names" util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/reconciler/names" "go.uber.org/zap" "golang.org/x/oauth2/google" v1 "k8s.io/api/core/v1" @@ -46,6 +46,20 @@ type persistence int const ( persistStatus persistence = iota noNeedToPersist + + // Name of the corev1.Events emitted from the reconciliation process + channelReconciled = "ChannelReconciled" + channelUpdateStatusFailed = "ChannelUpdateStatusFailed" + channelReadStatusFailed = "ChannelReadStatusFailed" + gcpCredentialsReadFailed = "GcpCredentialsReadFailed" + gcpResourcesPlanFailed = "GcpResourcesPlanFailed" + gcpResourcesPersistFailed = "GcpResourcesPersistFailed" + virtualServiceCreateFailed = "VirtualServiceCreateFailed" + k8sServiceCreateFailed = "K8sServiceCreateFailed" + topicCreateFailed = "TopicCreateFailed" + topicDeleteFailed = "TopicDeleteFailed" + subscriptionSyncFailed = "SubscriptionSyncFailed" + subscriptionDeleteFailed = "SubscriptionDeleteFailed" ) // reconciler reconciles GCP-PubSub Channels by creating the K8s Service and Istio VirtualService @@ -116,10 +130,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err logging.FromContext(ctx).Info("Error reconciling Channel", zap.Error(reconcileErr)) // Note that we do not return the error here, because we want to update the Status // regardless of the error. + } else { + logging.FromContext(ctx).Info("Channel reconciled") + r.recorder.Eventf(c, v1.EventTypeNormal, channelReconciled, "Channel reconciled: %q", c.Name) } if err = util.UpdateChannel(ctx, r.client, c); err != nil { logging.FromContext(ctx).Info("Error updating Channel Status", zap.Error(err)) + r.recorder.Eventf(c, v1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err) return reconcile.Result{}, err } @@ -155,6 +173,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) originalPCS, err := pubsubutil.GetInternalStatus(c) if err != nil { logging.FromContext(ctx).Error("Unable to read the status.internal", zap.Error(err)) + r.recorder.Eventf(c, v1.EventTypeWarning, channelReadStatusFailed, "Failed to read Channel's status.internal: %v", err) return false, err } @@ -162,6 +181,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) gcpCreds, err := pubsubutil.GetCredentials(ctx, r.client, r.defaultSecret, r.defaultSecretKey) if err != nil { logging.FromContext(ctx).Info("Unable to generate GCP creds", zap.Error(err)) + r.recorder.Eventf(c, v1.EventTypeWarning, gcpCredentialsReadFailed, "Failed to generate GCP credentials: %v", err) return false, err } @@ -174,10 +194,12 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) // Topic is nil because it is only used for sub creation, not deletion. err = r.syncSubscriptions(ctx, originalPCS, gcpCreds, nil, subsToSync) if err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, subscriptionSyncFailed, "Failed to sync Subscription for the Channel: %v", err) return false, err } err = r.deleteTopic(ctx, originalPCS, gcpCreds) if err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, topicDeleteFailed, "Failed to delete Topic for the Channel: %v", err) return false, err } util.RemoveFinalizer(c, finalizerName) @@ -197,10 +219,12 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) // only at the status, not the spec. persist, plannedPCS, subsToSync, err := r.planGcpResources(ctx, c, originalPCS) if err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, gcpResourcesPlanFailed, "Failed to plan Channel's resources: %v", err) return false, err } if persist == persistStatus { if err = pubsubutil.SetInternalStatus(ctx, c, plannedPCS); err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, gcpResourcesPersistFailed, "Failed to persist Channel's resources: %v", err) return false, err } // Persist this and run another reconcile loop to enact it. @@ -209,26 +233,31 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) svc, err := r.createK8sService(ctx, c) if err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, k8sServiceCreateFailed, "Failed to reconcile Channel's K8s Service: %v", err) return false, err } err = r.createVirtualService(ctx, c, svc) if err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err) return false, err } topic, err := r.createTopic(ctx, plannedPCS, gcpCreds) if err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, topicCreateFailed, "Failed to reconcile Topic for the Channel: %v", err) return false, err } err = r.syncSubscriptions(ctx, plannedPCS, gcpCreds, topic, subsToSync) if err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, subscriptionSyncFailed, "Failed to reconcile Subscription for the Channel: %v", err) return false, err } // Now that the subs have synced successfully, remove the old ones from the status. plannedPCS.Subscriptions = subsToSync.subsToCreate if err = pubsubutil.SetInternalStatus(ctx, c, plannedPCS); err != nil { + r.recorder.Eventf(c, v1.EventTypeWarning, subscriptionDeleteFailed, "Failed to delete old Subscriptions from the Channel's status: %v", err) return false, err } diff --git a/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go b/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go index d74d5316b85..03ac497dca0 100644 --- a/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go +++ b/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go @@ -86,6 +86,22 @@ var ( }, }, } + + // map of events to set test cases' expectations easier + events = map[string]corev1.Event{ + channelReconciled: {Reason: channelReconciled, Type: corev1.EventTypeNormal}, + channelUpdateStatusFailed: {Reason: channelUpdateStatusFailed, Type: corev1.EventTypeWarning}, + channelReadStatusFailed: {Reason: channelReadStatusFailed, Type: corev1.EventTypeWarning}, + gcpCredentialsReadFailed: {Reason: gcpCredentialsReadFailed, Type: corev1.EventTypeWarning}, + gcpResourcesPlanFailed: {Reason: gcpResourcesPlanFailed, Type: corev1.EventTypeWarning}, + gcpResourcesPersistFailed: {Reason: gcpResourcesPersistFailed, Type: corev1.EventTypeWarning}, + virtualServiceCreateFailed: {Reason: virtualServiceCreateFailed, Type: corev1.EventTypeWarning}, + k8sServiceCreateFailed: {Reason: k8sServiceCreateFailed, Type: corev1.EventTypeWarning}, + topicCreateFailed: {Reason: topicCreateFailed, Type: corev1.EventTypeWarning}, + topicDeleteFailed: {Reason: topicDeleteFailed, Type: corev1.EventTypeWarning}, + subscriptionSyncFailed: {Reason: subscriptionSyncFailed, Type: corev1.EventTypeWarning}, + subscriptionDeleteFailed: {Reason: subscriptionDeleteFailed, Type: corev1.EventTypeWarning}, + } ) func init() { @@ -162,6 +178,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribers(), }, + WantEvent: []corev1.Event{ + events[subscriptionSyncFailed], + }, }, { Name: "Channel deleted - problem checking subscription existence", @@ -182,6 +201,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribers(), }, + WantEvent: []corev1.Event{ + events[subscriptionSyncFailed], + }, }, { Name: "Channel deleted - subscription does not exist", @@ -201,6 +223,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribersWithoutFinalizer(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Channel deleted - subscription deletion fails", @@ -222,6 +247,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribers(), }, + WantEvent: []corev1.Event{ + events[subscriptionSyncFailed], + }, }, { Name: "Channel deleted - subscription deletion succeeds", @@ -241,6 +269,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribersWithoutFinalizer(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Channel deleted - problem checking topic existence", @@ -261,6 +292,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribers(), }, + WantEvent: []corev1.Event{ + events[topicDeleteFailed], + }, }, { Name: "Channel deleted - No status.internal", @@ -285,6 +319,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithoutFinalizerOrPCS(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Channel deleted - topic does not exist", @@ -304,6 +341,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribersWithoutFinalizer(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Channel deleted - topic deletion fails", @@ -325,6 +365,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribers(), }, + WantEvent: []corev1.Event{ + events[topicDeleteFailed], + }, }, { Name: "Channel deleted - topic deletion succeeds", @@ -344,6 +387,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribersWithoutFinalizer(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Channel deleted - finalizer removed", @@ -354,6 +400,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithoutFinalizer(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Finalizer added", @@ -367,6 +416,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithFinalizer(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "GetCredential fails", @@ -378,6 +430,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizer(), }, WantErrMsg: testcreds.InvalidCredsError, + WantEvent: []corev1.Event{ + events[gcpCredentialsReadFailed], + }, }, { Name: "Error reading status.internal", @@ -385,6 +440,9 @@ func TestReconcile(t *testing.T) { makeChannelWithBadInternalStatus(), }, WantErrMsg: "json: cannot unmarshal number into Go struct field GcpPubSubChannelStatus.topic of type string", + WantEvent: []corev1.Event{ + events[channelReadStatusFailed], + }, }, { Name: "K8s service get fails", @@ -399,6 +457,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizerAndPCS(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[k8sServiceCreateFailed], + }, }, { Name: "K8s service creation fails", @@ -414,6 +475,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizerAndPCS(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[k8sServiceCreateFailed], + }, }, { Name: "Virtual service get fails", @@ -432,6 +496,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizerAndPCSAndAddress(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[virtualServiceCreateFailed], + }, }, { Name: "Virtual service creation fails", @@ -449,6 +516,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizerAndPCSAndAddress(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[virtualServiceCreateFailed], + }, }, { Name: "VirtualService already exists - not owned by Channel", @@ -461,6 +531,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeReadyChannel(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Error planning - subscriber missing UID", @@ -472,6 +545,9 @@ func TestReconcile(t *testing.T) { makeChannelWithFinalizerAndSubscriberWithoutUID(), }, WantErrMsg: "empty reference UID: {&ObjectReference{Kind:,Namespace:,Name:,UID:,APIVersion:,ResourceVersion:,FieldPath:,} http://foo/ }", + WantEvent: []corev1.Event{ + events[gcpResourcesPlanFailed], + }, }, { Name: "Persist plan", @@ -485,6 +561,9 @@ func TestReconcile(t *testing.T) { WantResult: reconcile.Result{ Requeue: true, }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Create Topic - problem creating client", @@ -503,6 +582,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithFinalizerAndPCSAndAddress(), }, + WantEvent: []corev1.Event{ + events[topicCreateFailed], + }, }, { Name: "Create Topic - problem checking existence", @@ -525,6 +607,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithFinalizerAndPCSAndAddress(), }, + WantEvent: []corev1.Event{ + events[topicCreateFailed], + }, }, { Name: "Create Topic - topic already exists", @@ -546,6 +631,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeReadyChannel(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Create Topic - error creating topic", @@ -566,6 +654,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithFinalizerAndPCSAndAddress(), }, + WantEvent: []corev1.Event{ + events[topicCreateFailed], + }, }, { Name: "Create Topic - topic create succeeds", @@ -578,6 +669,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeReadyChannel(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Create Subscriptions - problem checking exists", @@ -600,6 +694,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithSubscribersAndFinalizerAndPCSAndAddress(), }, + WantEvent: []corev1.Event{ + events[subscriptionSyncFailed], + }, }, { Name: "Create Subscriptions - already exists", @@ -621,6 +718,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeReadyChannelWithSubscribers(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Create Subscriptions - create fails", @@ -641,6 +741,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithSubscribersAndFinalizerAndPCSAndAddress(), }, + WantEvent: []corev1.Event{ + events[subscriptionSyncFailed], + }, }, { Name: "Create Subscriptions - create succeeds", @@ -653,6 +756,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeReadyChannelWithSubscribers(), }, + WantEvent: []corev1.Event{ + events[channelReconciled], + }, }, { Name: "Channel get for update fails", @@ -666,6 +772,9 @@ func TestReconcile(t *testing.T) { MockGets: errorOnSecondChannelGet(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelReconciled], events[channelUpdateStatusFailed], + }, }, { Name: "Channel update fails", @@ -679,6 +788,9 @@ func TestReconcile(t *testing.T) { MockUpdates: errorUpdatingChannel(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelReconciled], events[channelUpdateStatusFailed], + }, }, { Name: "Channel status update fails", InitialState: []runtime.Object{ @@ -691,6 +803,9 @@ func TestReconcile(t *testing.T) { MockStatusUpdates: errorUpdatingChannelStatus(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[channelReconciled], events[channelUpdateStatusFailed], + }, }, } diff --git a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go index b2248144b7f..bf388ca648a 100644 --- a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go +++ b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go @@ -33,6 +33,11 @@ import ( const ( // Name is the name of the GCP PubSub ClusterChannelProvisioner. Name = "gcp-pubsub" + + // Name of the corev1.Events emitted from the reconciliation process + ccpReconciled = "CcpReconciled" + ccpReconcileFailed = "CcpReconcileFailed" + ccpUpdateStatusFailed = "CcpUpdateStatusFailed" ) type reconciler struct { @@ -82,12 +87,17 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err reconcileErr := r.reconcile(ctx, ccp) if reconcileErr != nil { logging.FromContext(ctx).Info("Error reconciling ClusterChannelProvisioner", zap.Error(reconcileErr)) + r.recorder.Eventf(ccp, corev1.EventTypeWarning, ccpReconcileFailed, "ClusterChannelProvisioner reconciliation failed: %v", err) // Note that we do not return the error here, because we want to update the Status // regardless of the error. + } else { + logging.FromContext(ctx).Info("ClusterChannelProvisioner reconciled") + r.recorder.Eventf(ccp, corev1.EventTypeNormal, ccpReconciled, "ClusterChannelProvisioner reconciled: %q", ccp.Name) } if err = util.UpdateClusterChannelProvisionerStatus(ctx, r.client, ccp); err != nil { logging.FromContext(ctx).Info("Error updating ClusterChannelProvisioner Status", zap.Error(err)) + r.recorder.Eventf(ccp, corev1.EventTypeWarning, ccpUpdateStatusFailed, "Failed to update ClusterChannelProvisioner's status: %v", err) return reconcile.Result{}, err } diff --git a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile_test.go b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile_test.go index 3518c9bf21f..75fd60cf0eb 100644 --- a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile_test.go +++ b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile_test.go @@ -43,6 +43,13 @@ var ( // deletionTime is used when objects are marked as deleted. Rfc3339Copy() // truncates to seconds to match the loss of precision during serialization. deletionTime = metav1.Now().Rfc3339Copy() + + // map of events to set test cases' expectations easier + events = map[string]corev1.Event{ + ccpReconciled: {Reason: ccpReconciled, Type: corev1.EventTypeNormal}, + ccpReconcileFailed: {Reason: ccpReconcileFailed, Type: corev1.EventTypeWarning}, + ccpUpdateStatusFailed: {Reason: ccpUpdateStatusFailed, Type: corev1.EventTypeWarning}, + } ) func init() { @@ -148,6 +155,9 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeDeletingClusterChannelProvisioner(), }, + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, }, { Name: "Mark Ready", @@ -157,6 +167,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeReadyClusterChannelProvisioner(), }, + WantEvent: []corev1.Event{ + events[ccpReconciled], + }, }, { Name: "Error getting CCP for updating Status", @@ -168,6 +181,9 @@ func TestReconcile(t *testing.T) { MockGets: oneSuccessfulClusterChannelProvisionerGet(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[ccpReconciled], events[ccpUpdateStatusFailed], + }, }, { Name: "Error updating Status", @@ -181,6 +197,9 @@ func TestReconcile(t *testing.T) { }, }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[ccpReconciled], events[ccpUpdateStatusFailed], + }, }, } diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go index c4591b380c8..7e369e45072 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go @@ -22,6 +22,8 @@ import ( "sync" "time" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner" @@ -41,6 +43,11 @@ import ( const ( finalizerName = controllerAgentName + + // Name of the corev1.Events emitted from the reconciliation process + dispatcherReconciled = "DispatcherReconciled" + dispatcherReconcileFailed = "DispatcherReconcileFailed" + dispatcherUpdateStatusFailed = "DispatcherUpdateStatusFailed" ) type channelName = types.NamespacedName @@ -119,12 +126,17 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err requeue, reconcileErr := r.reconcile(logging.With(ctx, zap.Any("channel", c)), c, pcs) if reconcileErr != nil { logging.FromContext(ctx).Info("Error reconciling Channel", zap.Error(reconcileErr)) + r.recorder.Eventf(c, v1.EventTypeWarning, dispatcherReconcileFailed, "Dispatcher reconciliation failed: %v", err) // Note that we do not return the error here, because we want to update the finalizers // regardless of the error. + } else { + logging.FromContext(ctx).Info("Channel reconciled") + r.recorder.Eventf(c, v1.EventTypeNormal, dispatcherReconciled, "Dispatcher reconciled: %q", c.Name) } if err = util.UpdateChannel(ctx, r.client, c); err != nil { logging.FromContext(ctx).Info("Error updating Channel Status", zap.Error(err)) + r.recorder.Eventf(c, v1.EventTypeWarning, dispatcherUpdateStatusFailed, "Failed to update Channel's dispatcher status: %v", err) return reconcile.Result{}, err } diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go index 7b11e807c82..d8078590f02 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go @@ -91,6 +91,13 @@ var ( }, }, } + + // map of events to set test cases' expectations easier + events = map[string]corev1.Event{ + dispatcherReconciled: {Reason: dispatcherReconciled, Type: corev1.EventTypeNormal}, + dispatcherReconcileFailed: {Reason: dispatcherReconcileFailed, Type: corev1.EventTypeWarning}, + dispatcherUpdateStatusFailed: {Reason: dispatcherUpdateStatusFailed, Type: corev1.EventTypeWarning}, + } ) func init() { @@ -182,6 +189,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithSubscribersWithoutFinalizer(), }, + WantEvent: []corev1.Event{ + events[dispatcherReconciled], + }, }, { Name: "Channel deleted - finalizer removed", @@ -192,6 +202,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeDeletingChannelWithoutFinalizer(), }, + WantEvent: []corev1.Event{ + events[dispatcherReconciled], + }, }, { Name: "Finalizer added", @@ -204,6 +217,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithSubscribersAndFinalizer(), }, + WantEvent: []corev1.Event{ + events[dispatcherReconciled], + }, }, { Name: "GetCredential fails", @@ -215,6 +231,9 @@ func TestReconcile(t *testing.T) { makeChannelWithSubscribersAndFinalizer(), }, WantErrMsg: testcreds.InvalidCredsError, + WantEvent: []corev1.Event{ + events[dispatcherReconcileFailed], + }, }, { Name: "Channel update fails - cannot create PubSub client", @@ -228,6 +247,9 @@ func TestReconcile(t *testing.T) { }, }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[dispatcherReconcileFailed], + }, }, { Name: "Receive errors", @@ -260,6 +282,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithSubscribersAndFinalizer(), }, + WantEvent: []corev1.Event{ + events[dispatcherReconciled], + }, }, { Name: "PubSub Subscription.Receive already running", @@ -282,6 +307,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithSubscribersAndFinalizer(), }, + WantEvent: []corev1.Event{ + events[dispatcherReconciled], + }, }, { Name: "Delete old Subscriptions", @@ -304,6 +332,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithSubscribersAndFinalizer(), }, + WantEvent: []corev1.Event{ + events[dispatcherReconciled], + }, }, { Name: "Delete all old Subscriptions", @@ -326,6 +357,9 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithFinalizer(), }, + WantEvent: []corev1.Event{ + events[dispatcherReconciled], + }, }, { Name: "Channel update fails", @@ -337,6 +371,9 @@ func TestReconcile(t *testing.T) { MockUpdates: errorUpdatingChannel(), }, WantErrMsg: testErrorMessage, + WantEvent: []corev1.Event{ + events[dispatcherReconciled], events[dispatcherUpdateStatusFailed], + }, }, // Note - we do not test update status since this dispatcher only adds // finalizers to the channel