From 91b5ce38dbeaa367ba931c3571f950b03b41ed94 Mon Sep 17 00:00:00 2001 From: Prabhjot Singh Sethi Date: Tue, 15 Apr 2025 17:31:23 +0000 Subject: [PATCH] Lock Owner to panic upon receiving a self release notification Signed-off-by: Prabhjot Singh Sethi --- errors/errors.go | 8 ++++---- lock/owner.go | 33 ++++++++++++++++++++++++++++++++- lock/owner_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 5 deletions(-) create mode 100644 lock/owner_test.go diff --git a/errors/errors.go b/errors/errors.go index 98681b4..9d2e677 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -14,7 +14,7 @@ func Is(err error, target error) bool { // get the error code if the error is // associated to recognizable error types -func getErrCode(err error) ErrCode { +func GetErrCode(err error) ErrCode { val, ok := err.(*Error) if ok { return ErrCode(val.code) @@ -60,17 +60,17 @@ func Wrapf(code ErrCode, format string, v ...any) error { // IsNotFound returns true if err // item isn't found in the space func IsNotFound(err error) bool { - return getErrCode(err) == NotFound + return GetErrCode(err) == NotFound } // IsAlreadyExists returns true if err // item already exists in the space func IsAlreadyExists(err error) bool { - return getErrCode(err) == AlreadyExists + return GetErrCode(err) == AlreadyExists } // IsInvalidArgument returns true if err // item is invalid argument func IsInvalidArgument(err error) bool { - return getErrCode(err) == InvalidArgument + return GetErrCode(err) == InvalidArgument } diff --git a/lock/owner.go b/lock/owner.go index b4f7f0a..1bc647a 100644 --- a/lock/owner.go +++ b/lock/owner.go @@ -6,11 +6,13 @@ package lock import ( "context" "log" + "reflect" "sync" "time" "github.com/google/uuid" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" "github.com/Prabhjot-Sethi/core/db" "github.com/Prabhjot-Sethi/core/errors" @@ -46,6 +48,13 @@ type ownerTableType struct { updateInterval time.Duration } +func (t *ownerTableType) DeleteCallback(op string, wKey interface{}) { + key := wKey.(*ownerKey) + if key.Name == t.key.Name { + log.Panicln("receiving delete notification of self lock") + } +} + func (t *ownerTableType) updateLastSeen() { data := &ownerData{ LastSeen: time.Now().Unix(), @@ -87,7 +96,29 @@ func (t *ownerTableType) allocateOwner(name string) error { Name: id + "-" + uid.String(), } } - err := t.col.InsertOne(context.Background(), t.key, data) + + matchDeleteStage := mongo.Pipeline{ + bson.D{{ + Key: "$match", + Value: bson.D{{ + Key: "operationType", + Value: "delete", + }}, + }}, + } + + err := t.col.SetKeyType(reflect.TypeOf(&ownerKey{})) + if err != nil { + return errors.Wrapf(errors.GetErrCode(err), "Got error while setting key type for watch notification: %s", err) + } + + // watch only for delete notification + err = t.col.Watch(t.ctx, matchDeleteStage, t.DeleteCallback) + if err != nil { + return err + } + + err = t.col.InsertOne(context.Background(), t.key, data) if err != nil { return err } diff --git a/lock/owner_test.go b/lock/owner_test.go new file mode 100644 index 0000000..787a79b --- /dev/null +++ b/lock/owner_test.go @@ -0,0 +1,43 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package lock + +import ( + "context" + "testing" + "time" + + "github.com/Prabhjot-Sethi/core/db" +) + +func Test_OwnerInit(t *testing.T) { + ctx, cancelfn := context.WithCancel(context.Background()) + defer time.Sleep(2 * time.Second) + defer cancelfn() + config := &db.MongoConfig{ + Host: "localhost", + Port: "27017", + Username: "root", + Password: "password", + } + + client, err := db.NewMongoClient(config) + + if err != nil { + t.Errorf("failed to connect to mongo DB Error: %s", err) + return + } + + err = client.HealthCheck(context.Background()) + if err != nil { + t.Errorf("failed to perform Health check with DB Error: %s", err) + } + + s := client.GetDataStore("test") + err = InitializeLockOwner(ctx, s, "test-owner") + if err != nil { + t.Errorf("Got error while initializing lock owner %s", err) + } + time.Sleep(1 * time.Second) +}