diff --git a/db/const.go b/db/const.go index d5cbdb5..f0230a6 100644 --- a/db/const.go +++ b/db/const.go @@ -9,3 +9,14 @@ package db const ( defaultSourceIdentifier = "MongoClientCore" ) + +const ( + // Operation string for mongo add/insert operation + MongoAddOp = "insert" + + // Operation string for mongo update operation + MongoUpdateOp = "update" + + // Operation string for mongo delete operation + MongoDeleteOp = "delete" +) diff --git a/db/mongo.go b/db/mongo.go index 6f0fda3..414e844 100644 --- a/db/mongo.go +++ b/db/mongo.go @@ -8,7 +8,9 @@ package db import ( "context" + "log" "net" + "reflect" "strconv" "go.mongodb.org/mongo-driver/bson" @@ -24,6 +26,23 @@ type mongoCollection struct { parent *mongoStore // handler for the parent mongo DB object colName string // name of the collection this collection object is working with col *mongo.Collection + keyType reflect.Type +} + +// Set KeyType for the collection, this is not mandatory +// while the key type will be used by the interface implementer +// mainly for Watch Callback for providing decoded key, if not +// set watch will be working with the default decoders of +// interface implementer +// only pointer key type is supported as of now +// returns error if the key type is not a pointer +func (c *mongoCollection) SetKeyType(keyType reflect.Type) error { + if keyType.Kind() != reflect.Ptr { + // return error, as only pointer key type is supported + return errors.Wrap(errors.InvalidArgument, "key type is not a pointer") + } + c.keyType = keyType + return nil } // inserts one entry with given key and data to the collection @@ -147,6 +166,84 @@ func (c *mongoCollection) DeleteOne(ctx context.Context, key interface{}) error return nil } +// watch allows getting notified whenever a change happens to a document +// in the collection +func (c *mongoCollection) Watch(ctx context.Context, cb WatchCallbackfn) error { + // start watching on the collection with passed context + stream, err := c.col.Watch(ctx, mongo.Pipeline{}) + if err != nil { + return err + } + + // run the loop on stream in a separate go routine + // allowing the watch starter to resume control and work with + // managing Watch stream by virtue of passed context + go func() { + // take a snapshot of keyTpe for processing watch + keyType := c.keyType + // ensure closing of the open stream in case of returning from here + // keeping the handles and stack clean + // Note: this may not be required, if loop doesn't require it + // but still it is safe to keep ensuring appropriate cleanup + defer stream.Close(context.Background()) + defer func() { + if !errors.Is(ctx.Err(), context.Canceled) { + // panic if the return from this function is not + // due to context being canceled + log.Panicf("End of stream observed due to error %s", stream.Err()) + } + }() + for stream.Next(ctx) { + var data bson.M + if err := stream.Decode(&data); err != nil { + log.Printf("Closing watch due to decoding error %s", err) + return + } + + op, ok := data["operationType"].(string) + if !ok { + log.Printf("Closing watch due to error, unable to find decode operation type ") + return + } + + dk, ok := data["documentKey"].(bson.M) + if !ok { + log.Printf("Closing watch due to error, unable to find key") + return + } + + bKey, ok := dk["_id"].(bson.M) + if !ok { + log.Printf("Closing watch due to error, unable to find id") + return + } + + // key that will be shared with callback function + var key interface{} + if keyType != nil { + key = reflect.New(keyType.Elem()).Interface() + } else { + key = bson.D{} + } + + marshaledData, err := bson.Marshal(bKey) + if err != nil { + log.Printf("Closing watch due to error, while bson Marshal : %q", err) + return + } + + err = bson.Unmarshal(marshaledData, key) + if err != nil { + log.Printf("Closing watch due to error, while bson Unmarshal to key : %q", err) + return + } + cb(op, key) + } + }() + + return nil +} + type mongoStore struct { Store db *mongo.Database diff --git a/db/mongo_test.go b/db/mongo_test.go index 8652039..ab1d5ce 100644 --- a/db/mongo_test.go +++ b/db/mongo_test.go @@ -5,7 +5,9 @@ package db import ( "context" + "reflect" "testing" + "time" ) type MyKey struct { @@ -136,3 +138,126 @@ func Test_ClientConnection(t *testing.T) { } }) } + +var ( + mongoTestAddUpOps int + mongoTestDeleteOps int +) + +func myKeyWatcher(op string, wKey interface{}) { + _ = wKey.(*MyKey) + switch op { + case MongoAddOp, MongoUpdateOp: + mongoTestAddUpOps += 1 + case MongoDeleteOp: + mongoTestDeleteOps += 1 + } +} + +func Test_CollectionWatch(t *testing.T) { + t.Run("WatchTest", func(t *testing.T) { + config := &MongoConfig{ + Host: "localhost", + Port: "27017", + Username: "root", + Password: "password", + } + + client, err := NewMongoClient(config) + + if err != nil { + t.Errorf("failed to connect to mongo DB Error: %s", err) + return + } + + err = client.HealthCheck(context.Background()) + if err != nil { + t.Errorf("failed to perform Health check with DB Error: %s", err) + } + + s := client.GetDataStore("test") + + col := s.GetCollection("collection1") + + err = col.SetKeyType(reflect.TypeOf(MyKey{})) + if err == nil { + t.Errorf("collection should not allow key type, when not a pointer") + } + + // set key type to ptr of my key + err = col.SetKeyType(reflect.TypeOf(&MyKey{})) + if err != nil { + t.Errorf("failed to set key type for watch: %s", err) + } + + watchCtx, cancelfn := context.WithCancel(context.Background()) + defer func() { + time.Sleep(2 * time.Second) + cancelfn() + if mongoTestAddUpOps != 3 { + t.Errorf("Add/Update Notify: Got %d, expected 3", mongoTestAddUpOps) + } + if mongoTestDeleteOps != 2 { + t.Errorf("Delete Notify: Got %d, expected 2", mongoTestDeleteOps) + } + }() + // reset counters + mongoTestAddUpOps = 0 + mongoTestDeleteOps = 0 + col.Watch(watchCtx, myKeyWatcher) + + key := &MyKey{ + Name: "test-key", + } + data := &MyData{ + Desc: "sample-description", + Val: &InternaData{ + Test: "abc", + }, + } + + err = col.InsertOne(context.Background(), key, data) + if err != nil { + t.Errorf("failed to insert an entry to collection Error: %s", err) + } + + val := &MyData{} + err = col.FindOne(context.Background(), key, val) + if err != nil { + t.Errorf("failed to find the entry Error: %s", err) + } + + data.Desc = "new description" + data.Val.Test = "xyz" + err = col.UpdateOne(context.Background(), key, data, false) + if err != nil { + t.Errorf("failed to update an entry to collection Error: %s", err) + } + + val = &MyData{} + err = col.FindOne(context.Background(), key, val) + if err != nil { + t.Errorf("failed to find the entry Error: %s", err) + } + + err = col.DeleteOne(context.Background(), key) + if err != nil { + t.Errorf("failed to delete entry using key Error: %s", err) + } + + err = col.DeleteOne(context.Background(), key) + if err == nil { + t.Errorf("attemptting delete on already deleted entry, but didn't receive expected error") + } + + err = col.UpdateOne(context.Background(), key, data, true) + if err != nil { + t.Errorf("failed to update an entry to collection Error: %s", err) + } + + err = col.DeleteOne(context.Background(), key) + if err != nil { + t.Errorf("failed to delete entry using key Error: %s", err) + } + }) +} diff --git a/db/store.go b/db/store.go index 850ad42..01b2dbc 100644 --- a/db/store.go +++ b/db/store.go @@ -8,10 +8,23 @@ package db import ( "context" + "reflect" ) +// WatchCallbackfn responsible for +type WatchCallbackfn func(op string, key interface{}) + // interface definition for a collection in store type StoreCollection interface { + // Set KeyType for the collection, this is not mandatory + // while the key type will be used by the interface implementer + // mainly for Watch Callback for providing decoded key, if not + // set watch will be working with the default decoders of + // interface implementer + // only pointer key type is supported as of now + // returns error if the key type is not a pointer + SetKeyType(keyType reflect.Type) error + // insert one entry to the collection for the given key and data InsertOne(ctx context.Context, key interface{}, data interface{}) error @@ -30,6 +43,10 @@ type StoreCollection interface { // remove one entry from the collection matching the given key DeleteOne(ctx context.Context, key interface{}) error + + // watch allows getting notified whenever a change happens to a document + // in the collection + Watch(ctx context.Context, cb WatchCallbackfn) error } // interface definition for a store, responsible for holding group diff --git a/errors/errors.go b/errors/errors.go index 60cfd3e..13d6c3a 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -3,6 +3,14 @@ package errors +import ( + base "errors" +) + +func Is(err error, target error) bool { + return base.Is(err, target) +} + // get the error code if the error is // associated to recognizable error types func getErrCode(err error) ErrCode {