Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions db/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,14 @@ package db
const (
defaultSourceIdentifier = "MongoClientCore"
)

const (
// Operation string for mongo add/insert operation
MongoAddOp = "insert"

// Operation string for mongo update operation
MongoUpdateOp = "update"

// Operation string for mongo delete operation
MongoDeleteOp = "delete"
)
97 changes: 97 additions & 0 deletions db/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ package db

import (
"context"
"log"
"net"
"reflect"
"strconv"

"go.mongodb.org/mongo-driver/bson"
Expand All @@ -24,6 +26,23 @@ type mongoCollection struct {
parent *mongoStore // handler for the parent mongo DB object
colName string // name of the collection this collection object is working with
col *mongo.Collection
keyType reflect.Type
}

// Set KeyType for the collection, this is not mandatory
// while the key type will be used by the interface implementer
// mainly for Watch Callback for providing decoded key, if not
// set watch will be working with the default decoders of
// interface implementer
// only pointer key type is supported as of now
// returns error if the key type is not a pointer
func (c *mongoCollection) SetKeyType(keyType reflect.Type) error {
if keyType.Kind() != reflect.Ptr {
// return error, as only pointer key type is supported
return errors.Wrap(errors.InvalidArgument, "key type is not a pointer")
}
c.keyType = keyType
return nil
}

// inserts one entry with given key and data to the collection
Expand Down Expand Up @@ -147,6 +166,84 @@ func (c *mongoCollection) DeleteOne(ctx context.Context, key interface{}) error
return nil
}

// watch allows getting notified whenever a change happens to a document
// in the collection
func (c *mongoCollection) Watch(ctx context.Context, cb WatchCallbackfn) error {
// start watching on the collection with passed context
stream, err := c.col.Watch(ctx, mongo.Pipeline{})
if err != nil {
return err
}

// run the loop on stream in a separate go routine
// allowing the watch starter to resume control and work with
// managing Watch stream by virtue of passed context
go func() {
// take a snapshot of keyTpe for processing watch
keyType := c.keyType
// ensure closing of the open stream in case of returning from here
// keeping the handles and stack clean
// Note: this may not be required, if loop doesn't require it
// but still it is safe to keep ensuring appropriate cleanup
defer stream.Close(context.Background())
defer func() {
if !errors.Is(ctx.Err(), context.Canceled) {
// panic if the return from this function is not
// due to context being canceled
log.Panicf("End of stream observed due to error %s", stream.Err())
}
}()
for stream.Next(ctx) {
var data bson.M
if err := stream.Decode(&data); err != nil {
log.Printf("Closing watch due to decoding error %s", err)
return
}

op, ok := data["operationType"].(string)
if !ok {
log.Printf("Closing watch due to error, unable to find decode operation type ")
return
}

dk, ok := data["documentKey"].(bson.M)
if !ok {
log.Printf("Closing watch due to error, unable to find key")
return
}

bKey, ok := dk["_id"].(bson.M)
if !ok {
log.Printf("Closing watch due to error, unable to find id")
return
}

// key that will be shared with callback function
var key interface{}
if keyType != nil {
key = reflect.New(keyType.Elem()).Interface()
} else {
key = bson.D{}
}

marshaledData, err := bson.Marshal(bKey)
if err != nil {
log.Printf("Closing watch due to error, while bson Marshal : %q", err)
return
}

err = bson.Unmarshal(marshaledData, key)
if err != nil {
log.Printf("Closing watch due to error, while bson Unmarshal to key : %q", err)
return
}
cb(op, key)
}
}()

return nil
}

type mongoStore struct {
Store
db *mongo.Database
Expand Down
125 changes: 125 additions & 0 deletions db/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package db

import (
"context"
"reflect"
"testing"
"time"
)

type MyKey struct {
Expand Down Expand Up @@ -136,3 +138,126 @@ func Test_ClientConnection(t *testing.T) {
}
})
}

var (
mongoTestAddUpOps int
mongoTestDeleteOps int
)

func myKeyWatcher(op string, wKey interface{}) {
_ = wKey.(*MyKey)
switch op {
case MongoAddOp, MongoUpdateOp:
mongoTestAddUpOps += 1
case MongoDeleteOp:
mongoTestDeleteOps += 1
}
}

func Test_CollectionWatch(t *testing.T) {
t.Run("WatchTest", func(t *testing.T) {
config := &MongoConfig{
Host: "localhost",
Port: "27017",
Username: "root",
Password: "password",
}

client, err := NewMongoClient(config)

if err != nil {
t.Errorf("failed to connect to mongo DB Error: %s", err)
return
}

err = client.HealthCheck(context.Background())
if err != nil {
t.Errorf("failed to perform Health check with DB Error: %s", err)
}

s := client.GetDataStore("test")

col := s.GetCollection("collection1")

err = col.SetKeyType(reflect.TypeOf(MyKey{}))
if err == nil {
t.Errorf("collection should not allow key type, when not a pointer")
}

// set key type to ptr of my key
err = col.SetKeyType(reflect.TypeOf(&MyKey{}))
if err != nil {
t.Errorf("failed to set key type for watch: %s", err)
}

watchCtx, cancelfn := context.WithCancel(context.Background())
defer func() {
time.Sleep(2 * time.Second)
cancelfn()
if mongoTestAddUpOps != 3 {
t.Errorf("Add/Update Notify: Got %d, expected 3", mongoTestAddUpOps)
}
if mongoTestDeleteOps != 2 {
t.Errorf("Delete Notify: Got %d, expected 2", mongoTestDeleteOps)
}
}()
// reset counters
mongoTestAddUpOps = 0
mongoTestDeleteOps = 0
col.Watch(watchCtx, myKeyWatcher)

key := &MyKey{
Name: "test-key",
}
data := &MyData{
Desc: "sample-description",
Val: &InternaData{
Test: "abc",
},
}

err = col.InsertOne(context.Background(), key, data)
if err != nil {
t.Errorf("failed to insert an entry to collection Error: %s", err)
}

val := &MyData{}
err = col.FindOne(context.Background(), key, val)
if err != nil {
t.Errorf("failed to find the entry Error: %s", err)
}

data.Desc = "new description"
data.Val.Test = "xyz"
err = col.UpdateOne(context.Background(), key, data, false)
if err != nil {
t.Errorf("failed to update an entry to collection Error: %s", err)
}

val = &MyData{}
err = col.FindOne(context.Background(), key, val)
if err != nil {
t.Errorf("failed to find the entry Error: %s", err)
}

err = col.DeleteOne(context.Background(), key)
if err != nil {
t.Errorf("failed to delete entry using key Error: %s", err)
}

err = col.DeleteOne(context.Background(), key)
if err == nil {
t.Errorf("attemptting delete on already deleted entry, but didn't receive expected error")
}

err = col.UpdateOne(context.Background(), key, data, true)
if err != nil {
t.Errorf("failed to update an entry to collection Error: %s", err)
}

err = col.DeleteOne(context.Background(), key)
if err != nil {
t.Errorf("failed to delete entry using key Error: %s", err)
}
})
}
17 changes: 17 additions & 0 deletions db/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,23 @@ package db

import (
"context"
"reflect"
)

// WatchCallbackfn responsible for
type WatchCallbackfn func(op string, key interface{})

// interface definition for a collection in store
type StoreCollection interface {
// Set KeyType for the collection, this is not mandatory
// while the key type will be used by the interface implementer
// mainly for Watch Callback for providing decoded key, if not
// set watch will be working with the default decoders of
// interface implementer
// only pointer key type is supported as of now
// returns error if the key type is not a pointer
SetKeyType(keyType reflect.Type) error

// insert one entry to the collection for the given key and data
InsertOne(ctx context.Context, key interface{}, data interface{}) error

Expand All @@ -30,6 +43,10 @@ type StoreCollection interface {

// remove one entry from the collection matching the given key
DeleteOne(ctx context.Context, key interface{}) error

// watch allows getting notified whenever a change happens to a document
// in the collection
Watch(ctx context.Context, cb WatchCallbackfn) error
}

// interface definition for a store, responsible for holding group
Expand Down
8 changes: 8 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@

package errors

import (
base "errors"
)

func Is(err error, target error) bool {
return base.Is(err, target)
}

// get the error code if the error is
// associated to recognizable error types
func getErrCode(err error) ErrCode {
Expand Down