Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 33 additions & 58 deletions internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/yaml"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
Expand All @@ -43,14 +45,15 @@ import (
func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
eventLogger := log.FromContext(r.Context())

ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()

var allAlerts apiv1beta2.AlertList
err := s.kubeClient.List(ctx, &allAlerts)
if err != nil {
s.logger.Error(err, "listing alerts failed")
eventLogger.Error(err, "listing alerts failed")
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -59,6 +62,9 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
alerts := make([]apiv1beta2.Alert, 0)
each_alert:
for _, alert := range allAlerts.Items {
alertLogger := eventLogger.WithValues("alert", client.ObjectKeyFromObject(&alert))
ctx := log.IntoContext(ctx, alertLogger)
Comment thread
makkes marked this conversation as resolved.

// skip suspended and not ready alerts
isReady := conditions.IsReady(&alert)
if alert.Spec.Suspend || !isReady {
Expand All @@ -75,7 +81,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
break
}
} else {
s.logger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", inclusionRegex))
alertLogger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", inclusionRegex))
}
}
if !include {
Expand All @@ -91,7 +97,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
continue each_alert
}
} else {
s.logger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exclusionRegex))
alertLogger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exclusionRegex))
}
}
}
Expand All @@ -109,27 +115,24 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
}

if len(alerts) == 0 {
s.logger.Info("Discarding event, no alerts found for the involved object",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
eventLogger.Info("Discarding event, no alerts found for the involved object")
w.WriteHeader(http.StatusAccepted)
return
}

s.logger.Info(fmt.Sprintf("Dispatching event: %s", event.Message),
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
eventLogger.Info(fmt.Sprintf("Dispatching event: %s", event.Message))

// dispatch notifications
for _, alert := range alerts {
alertLogger := eventLogger.WithValues("alert", client.ObjectKeyFromObject(&alert))
ctx := log.IntoContext(ctx, alertLogger)

// verify if event comes from a different namespace
if s.noCrossNamespaceRefs && event.InvolvedObject.Namespace != alert.Namespace {
accessDenied := fmt.Errorf(
"alert '%s/%s' can't process event from '%s/%s/%s', cross-namespace references have been blocked",
alert.Namespace, alert.Name, event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name)
s.logger.Error(accessDenied, "Discarding event, access denied to cross-namespace sources")
alertLogger.Error(accessDenied, "Discarding event, access denied to cross-namespace sources")
continue
}

Expand All @@ -138,10 +141,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)

err = s.kubeClient.Get(ctx, providerName, &provider)
if err != nil {
s.logger.Error(err, "failed to read provider",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read provider")
continue
}

Expand All @@ -161,10 +161,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)

err = s.kubeClient.Get(ctx, secretName, &secret)
if err != nil {
s.logger.Error(err, "failed to read secret",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read secret")
continue
}

Expand All @@ -191,10 +188,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
if h, ok := secret.Data["headers"]; ok {
err := yaml.Unmarshal(h, &headers)
if err != nil {
s.logger.Error(err, "failed to read headers from secret",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read headers from secret")
continue
}
}
Expand All @@ -207,68 +201,51 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)

err = s.kubeClient.Get(ctx, secretName, &secret)
if err != nil {
s.logger.Error(err, "failed to read secret",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read cert secret")
continue
}

caFile, ok := secret.Data["caFile"]
if !ok {
s.logger.Error(err, "failed to read secret key caFile",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read secret key caFile")
continue
}

certPool = x509.NewCertPool()
ok = certPool.AppendCertsFromPEM(caFile)
if !ok {
s.logger.Error(err, "could not append to cert pool",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "could not append to cert pool")
continue
}
}

if webhook == "" {
s.logger.Error(nil, "provider has no address",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(nil, "provider has no address")
continue
}

factory := notifier.NewFactory(webhook, proxy, username, provider.Spec.Channel, token, headers, certPool, password, string(provider.UID))
sender, err := factory.Notifier(provider.Spec.Type)
if err != nil {
s.logger.Error(err, "failed to initialize provider",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to initialize provider")
continue
}

notification := *event.DeepCopy()
s.enhanceEventWithAlertMetadata(&notification, alert)
s.enhanceEventWithAlertMetadata(ctx, &notification, alert)

go func(n notifier.Interface, e eventv1.Event) {
ctx, cancel := context.WithTimeout(context.Background(), provider.GetTimeout())
defer cancel()
ctx = log.IntoContext(ctx, alertLogger)
if err := n.Post(ctx, e); err != nil {
maskedErrStr, maskErr := masktoken.MaskTokenFromString(err.Error(), token)
if maskErr != nil {
err = maskErr
} else {
err = errors.New(maskedErrStr)
}
s.logger.Error(err, "failed to send notification",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
alertLogger.Error(err, "failed to send notification")
}
}(sender, notification)
}
Expand All @@ -278,6 +255,8 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
}

func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Event, source apiv1.CrossNamespaceObjectReference, severity string) bool {
alertLogger := log.FromContext(ctx)

if event.InvolvedObject.Namespace == source.Namespace && event.InvolvedObject.Kind == source.Kind {
if event.Severity == severity || severity == eventv1.EventSeverityInfo {
labelMatch := true
Expand All @@ -291,15 +270,14 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
Namespace: event.InvolvedObject.Namespace,
Name: event.InvolvedObject.Name,
}, &obj); err != nil {
s.logger.Error(err, "error getting object", "kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name, "apiVersion", event.InvolvedObject.APIVersion)
alertLogger.Error(err, "error getting the involved object")
}

sel, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: source.MatchLabels,
})
if err != nil {
s.logger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name))
alertLogger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name))
}

labelMatch = sel.Matches(labels.Set(obj.GetLabels()))
Expand All @@ -314,7 +292,7 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
return false
}

func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert apiv1beta2.Alert) {
func (s *EventServer) enhanceEventWithAlertMetadata(ctx context.Context, event *eventv1.Event, alert apiv1beta2.Alert) {
meta := event.Metadata
if meta == nil {
meta = make(map[string]string)
Expand All @@ -324,11 +302,8 @@ func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert
if _, alreadyPresent := meta[key]; !alreadyPresent {
meta[key] = value
} else {
s.logger.Info("metadata key found in the existing set of metadata",
"reconciler kind", apiv1beta2.AlertKind,
"name", alert.Name,
"namespace", alert.Namespace,
"key", key)
log.FromContext(ctx).
Info("metadata key found in the existing set of metadata", "key", key)
}
}

Expand Down
3 changes: 2 additions & 1 deletion internal/server/event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package server

import (
"context"
"testing"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestEnhanceEventWithAlertMetadata(t *testing.T) {
t.Run(name, func(t *testing.T) {
g := NewGomegaWithT(t)

s.enhanceEventWithAlertMetadata(&tt.event, tt.alert)
s.enhanceEventWithAlertMetadata(context.Background(), &tt.event, tt.alert)
g.Expect(tt.event.Metadata).To(BeEquivalentTo(tt.expectedMetadata))
})
}
Expand Down
31 changes: 17 additions & 14 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/slok/go-http-metrics/middleware"
"github.com/slok/go-http-metrics/middleware/std"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
Expand Down Expand Up @@ -68,8 +69,8 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
var handler http.Handler = http.HandlerFunc(s.handleEvent())
for _, middleware := range []func(http.Handler) http.Handler{
limitMiddleware.Handle,
s.logRateLimitMiddleware,
s.cleanupMetadataMiddleware,
logRateLimitMiddleware,
s.eventMiddleware,
} {
handler = middleware(handler)
}
Expand Down Expand Up @@ -100,10 +101,12 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
}
}

// cleanupMetadataMiddleware cleans up the metadata using cleanupMetadata() and
// eventMiddleware cleans up the event metadata using cleanupMetadata() and
// adds the cleaned event in the request context which can then be queried and
// used directly by the other http handlers.
func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {
// used directly by the other http handlers. This middleware also adds a
// logger with the event's involved object's reference information to the
// request context.
func (s *EventServer) eventMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
Expand All @@ -124,10 +127,13 @@ func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {

cleanupMetadata(event)

ctxWithEvent := context.WithValue(r.Context(), eventContextKey{}, event)
reqWithEvent := r.WithContext(ctxWithEvent)
eventLogger := s.logger.WithValues("eventInvolvedObject", event.InvolvedObject)

h.ServeHTTP(w, reqWithEvent)
enhancedCtx := context.WithValue(r.Context(), eventContextKey{}, event)
enhancedCtx = log.IntoContext(enhancedCtx, eventLogger)
enhancedReq := r.WithContext(enhancedCtx)

h.ServeHTTP(w, enhancedReq)
})
}

Expand Down Expand Up @@ -172,7 +178,7 @@ func (r *statusRecorder) WriteHeader(status int) {
r.ResponseWriter.WriteHeader(status)
}

func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
func logRateLimitMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
recorder := &statusRecorder{
ResponseWriter: w,
Expand All @@ -181,11 +187,8 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
h.ServeHTTP(recorder, r)

if recorder.Status == http.StatusTooManyRequests {
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
s.logger.V(1).Info("Discarding event, rate limiting duplicate events",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
log.FromContext(r.Context()).V(1).
Info("Discarding event, rate limiting duplicate events")
}
})
}
Expand Down