Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions db/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package db

import (
"context"
"fmt"
"log"
"reflect"

"go.mongodb.org/mongo-driver/v2/bson"
)

type DocumentKey[K any] struct {
Key *K `bson:"_id,omitempty"`
}

type UpdateDescription[E any] struct {
UpdatedFields *E `bson:"updatedFields,omitempty"`
RemovedFields []string `bson:"removedFields,omitempty"`
}

type Event[K any, E any] struct {
Doc DocumentKey[K] `bson:"documentKey,omitempty"`
Op string `bson:"operationType,omitempty"`
Time bson.Timestamp `bson:"clusterTime,omitempty"`
Entry *E `bson:"fullDocument,omitempty"`
Updates *UpdateDescription[E] `bson:"updateDescription,omitempty"`
}

func (e *Event[K, E]) LogEvent() {
msg := fmt.Sprintf("Event: Key=%v, Op=%s, Time=%v", e.Doc.Key, e.Op, e.Time)
if e.Entry != nil {
msg += fmt.Sprintf(", Entry= %v", *e.Entry)
}
if e.Updates != nil && e.Updates.UpdatedFields != nil {
msg += fmt.Sprintf(", Updates=%v", *e.Updates.UpdatedFields)
}

log.Print(msg)
}

type EventLogger[K any, E any] struct {
col StoreCollection
ts *bson.Timestamp
}

func NewEventLogger[K any, E any](col StoreCollection, timestamp *bson.Timestamp) *EventLogger[K, E] {
logger := &EventLogger[K, E]{
col: col,
ts: timestamp,
}
return logger
}

func (l *EventLogger[K, E]) Start(ctx context.Context) error {
var event Event[K, E]
eventType := reflect.TypeOf(event)

log.Printf("Starting event logger for collection with event type: %s", eventType)

return l.col.startEventLogger(ctx, eventType, l.ts)
}
76 changes: 76 additions & 0 deletions db/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,82 @@ func (c *mongoCollection) Watch(ctx context.Context, filter any, cb WatchCallbac
return nil
}

// startEventLogger starts the event logger for the collection and trigger logger for events
func (c *mongoCollection) startEventLogger(ctx context.Context, eventType reflect.Type, timestamp *bson.Timestamp) error {
// TODO(prabhjot) if we may need to enable pre and post images for change streams
// to get the full document before and after the change, probably not needed
// as of now, as the continuity or sequence of the events will anyway provide
// the complete context of the change to the object
/*
func() {
// Enable pre-images on the collection
cmd := bson.D{
{Key: "collMod", Value: c.colName},
{Key: "changeStreamPreAndPostImages", Value: bson.D{
{Key: "enabled", Value: true},
}},
}

var result bson.M
if err := c.col.Database().RunCommand(ctx, cmd).Decode(&result); err != nil {
log.Fatalf("Failed to enable pre-images: %v", err)
}
}()
*/

opts := options.ChangeStream()
opts.SetFullDocumentBeforeChange(options.WhenAvailable)
opts.SetFullDocument(options.WhenAvailable)

if timestamp != nil {
opts.SetStartAtOperationTime(timestamp)
}

// start watching on the collection with required context
stream, err := c.col.Watch(ctx, mongo.Pipeline{}, opts)
if err != nil {
return err
}

// run the loop on stream in a separate go routine
// allowing the watch starter to resume control and work with
// managing Watch stream by virtue of passed context
go func() {
// ensure closing of the open stream in case of returning from here
// keeping the handles and stack clean
// Note: this may not be required, if loop doesn't require it
// but still it is safe to keep ensuring appropriate cleanup
defer func() {
// ignore the error returned by stream close as of now
_ = stream.Close(context.Background())
}()
defer func() {
if !errors.Is(ctx.Err(), context.Canceled) {
// panic if the return from this function is not
// due to context being canceled
log.Panicf("End of stream observed due to error %s", stream.Err())
}
}()
for stream.Next(ctx) {
event := reflect.New(eventType)

if err := stream.Decode(event.Interface()); err != nil {
log.Printf("Closing watch due to decoding error %s", err)
return
}

method := event.MethodByName("LogEvent")
if !method.IsValid() {
log.Println("Invalid Log Events method, skipping event logging")
} else {
method.Call([]reflect.Value{})
}
}
}()

return nil
}

type mongoStore struct {
Store
db *mongo.Database
Expand Down
4 changes: 4 additions & 0 deletions db/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package db
import (
"context"
"reflect"

"go.mongodb.org/mongo-driver/v2/bson"
)

// WatchCallbackfn responsible for
Expand Down Expand Up @@ -57,6 +59,8 @@ type StoreCollection interface {
// function to receive only conditional notifications of the events
// listener is interested about
Watch(ctx context.Context, filter any, cb WatchCallbackfn) error

startEventLogger(ctx context.Context, eventType reflect.Type, timestamp *bson.Timestamp) error
}

// interface definition for a store, responsible for holding group
Expand Down