From d1546181ad8bdc26b774d67182e5bd3c21496004 Mon Sep 17 00:00:00 2001 From: Prabhjot Singh Sethi Date: Thu, 7 Aug 2025 05:32:08 +0000 Subject: [PATCH] Provide constructs for basic events logger Signed-off-by: Prabhjot Singh Sethi --- db/event.go | 64 ++++++++++++++++++++++++++++++++++++++++++++ db/mongo.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++ db/store.go | 4 +++ 3 files changed, 144 insertions(+) create mode 100644 db/event.go diff --git a/db/event.go b/db/event.go new file mode 100644 index 0000000..fc2907f --- /dev/null +++ b/db/event.go @@ -0,0 +1,64 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package db + +import ( + "context" + "fmt" + "log" + "reflect" + + "go.mongodb.org/mongo-driver/v2/bson" +) + +type DocumentKey[K any] struct { + Key *K `bson:"_id,omitempty"` +} + +type UpdateDescription[E any] struct { + UpdatedFields *E `bson:"updatedFields,omitempty"` + RemovedFields []string `bson:"removedFields,omitempty"` +} + +type Event[K any, E any] struct { + Doc DocumentKey[K] `bson:"documentKey,omitempty"` + Op string `bson:"operationType,omitempty"` + Time bson.Timestamp `bson:"clusterTime,omitempty"` + Entry *E `bson:"fullDocument,omitempty"` + Updates *UpdateDescription[E] `bson:"updateDescription,omitempty"` +} + +func (e *Event[K, E]) LogEvent() { + msg := fmt.Sprintf("Event: Key=%v, Op=%s, Time=%v", e.Doc.Key, e.Op, e.Time) + if e.Entry != nil { + msg += fmt.Sprintf(", Entry= %v", *e.Entry) + } + if e.Updates != nil && e.Updates.UpdatedFields != nil { + msg += fmt.Sprintf(", Updates=%v", *e.Updates.UpdatedFields) + } + + log.Print(msg) +} + +type EventLogger[K any, E any] struct { + col StoreCollection + ts *bson.Timestamp +} + +func NewEventLogger[K any, E any](col StoreCollection, timestamp *bson.Timestamp) *EventLogger[K, E] { + logger := &EventLogger[K, E]{ + col: col, + ts: timestamp, + } + return logger +} + +func (l *EventLogger[K, E]) Start(ctx context.Context) error { + var event Event[K, E] + eventType := reflect.TypeOf(event) + + log.Printf("Starting event logger for collection with event type: %s", eventType) + + return l.col.startEventLogger(ctx, eventType, l.ts) +} diff --git a/db/mongo.go b/db/mongo.go index 2a5c3f6..b8d1594 100644 --- a/db/mongo.go +++ b/db/mongo.go @@ -315,6 +315,82 @@ func (c *mongoCollection) Watch(ctx context.Context, filter any, cb WatchCallbac return nil } +// startEventLogger starts the event logger for the collection and trigger logger for events +func (c *mongoCollection) startEventLogger(ctx context.Context, eventType reflect.Type, timestamp *bson.Timestamp) error { + // TODO(prabhjot) if we may need to enable pre and post images for change streams + // to get the full document before and after the change, probably not needed + // as of now, as the continuity or sequence of the events will anyway provide + // the complete context of the change to the object + /* + func() { + // Enable pre-images on the collection + cmd := bson.D{ + {Key: "collMod", Value: c.colName}, + {Key: "changeStreamPreAndPostImages", Value: bson.D{ + {Key: "enabled", Value: true}, + }}, + } + + var result bson.M + if err := c.col.Database().RunCommand(ctx, cmd).Decode(&result); err != nil { + log.Fatalf("Failed to enable pre-images: %v", err) + } + }() + */ + + opts := options.ChangeStream() + opts.SetFullDocumentBeforeChange(options.WhenAvailable) + opts.SetFullDocument(options.WhenAvailable) + + if timestamp != nil { + opts.SetStartAtOperationTime(timestamp) + } + + // start watching on the collection with required context + stream, err := c.col.Watch(ctx, mongo.Pipeline{}, opts) + if err != nil { + return err + } + + // run the loop on stream in a separate go routine + // allowing the watch starter to resume control and work with + // managing Watch stream by virtue of passed context + go func() { + // ensure closing of the open stream in case of returning from here + // keeping the handles and stack clean + // Note: this may not be required, if loop doesn't require it + // but still it is safe to keep ensuring appropriate cleanup + defer func() { + // ignore the error returned by stream close as of now + _ = stream.Close(context.Background()) + }() + defer func() { + if !errors.Is(ctx.Err(), context.Canceled) { + // panic if the return from this function is not + // due to context being canceled + log.Panicf("End of stream observed due to error %s", stream.Err()) + } + }() + for stream.Next(ctx) { + event := reflect.New(eventType) + + if err := stream.Decode(event.Interface()); err != nil { + log.Printf("Closing watch due to decoding error %s", err) + return + } + + method := event.MethodByName("LogEvent") + if !method.IsValid() { + log.Println("Invalid Log Events method, skipping event logging") + } else { + method.Call([]reflect.Value{}) + } + } + }() + + return nil +} + type mongoStore struct { Store db *mongo.Database diff --git a/db/store.go b/db/store.go index fb85eab..2c14331 100644 --- a/db/store.go +++ b/db/store.go @@ -9,6 +9,8 @@ package db import ( "context" "reflect" + + "go.mongodb.org/mongo-driver/v2/bson" ) // WatchCallbackfn responsible for @@ -57,6 +59,8 @@ type StoreCollection interface { // function to receive only conditional notifications of the events // listener is interested about Watch(ctx context.Context, filter any, cb WatchCallbackfn) error + + startEventLogger(ctx context.Context, eventType reflect.Type, timestamp *bson.Timestamp) error } // interface definition for a store, responsible for holding group