Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Is(err error, target error) bool {

// get the error code if the error is
// associated to recognizable error types
func getErrCode(err error) ErrCode {
func GetErrCode(err error) ErrCode {
val, ok := err.(*Error)
if ok {
return ErrCode(val.code)
Expand Down Expand Up @@ -60,17 +60,17 @@ func Wrapf(code ErrCode, format string, v ...any) error {
// IsNotFound returns true if err
// item isn't found in the space
func IsNotFound(err error) bool {
return getErrCode(err) == NotFound
return GetErrCode(err) == NotFound
}

// IsAlreadyExists returns true if err
// item already exists in the space
func IsAlreadyExists(err error) bool {
return getErrCode(err) == AlreadyExists
return GetErrCode(err) == AlreadyExists
}

// IsInvalidArgument returns true if err
// item is invalid argument
func IsInvalidArgument(err error) bool {
return getErrCode(err) == InvalidArgument
return GetErrCode(err) == InvalidArgument
}
33 changes: 32 additions & 1 deletion lock/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package lock
import (
"context"
"log"
"reflect"
"sync"
"time"

"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
Expand Down Expand Up @@ -46,6 +48,13 @@ type ownerTableType struct {
updateInterval time.Duration
}

func (t *ownerTableType) DeleteCallback(op string, wKey interface{}) {
key := wKey.(*ownerKey)
if key.Name == t.key.Name {
log.Panicln("receiving delete notification of self lock")
}
}

func (t *ownerTableType) updateLastSeen() {
data := &ownerData{
LastSeen: time.Now().Unix(),
Expand Down Expand Up @@ -87,7 +96,29 @@ func (t *ownerTableType) allocateOwner(name string) error {
Name: id + "-" + uid.String(),
}
}
err := t.col.InsertOne(context.Background(), t.key, data)

matchDeleteStage := mongo.Pipeline{
bson.D{{
Key: "$match",
Value: bson.D{{
Key: "operationType",
Value: "delete",
}},
}},
}

err := t.col.SetKeyType(reflect.TypeOf(&ownerKey{}))
if err != nil {
return errors.Wrapf(errors.GetErrCode(err), "Got error while setting key type for watch notification: %s", err)
}

// watch only for delete notification
err = t.col.Watch(t.ctx, matchDeleteStage, t.DeleteCallback)
if err != nil {
return err
}

err = t.col.InsertOne(context.Background(), t.key, data)
if err != nil {
return err
}
Expand Down
43 changes: 43 additions & 0 deletions lock/owner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package lock

import (
"context"
"testing"
"time"

"github.com/Prabhjot-Sethi/core/db"
)

func Test_OwnerInit(t *testing.T) {
ctx, cancelfn := context.WithCancel(context.Background())
defer time.Sleep(2 * time.Second)
defer cancelfn()
config := &db.MongoConfig{
Host: "localhost",
Port: "27017",
Username: "root",
Password: "password",
}

client, err := db.NewMongoClient(config)

if err != nil {
t.Errorf("failed to connect to mongo DB Error: %s", err)
return
}

err = client.HealthCheck(context.Background())
if err != nil {
t.Errorf("failed to perform Health check with DB Error: %s", err)
}

s := client.GetDataStore("test")
err = InitializeLockOwner(ctx, s, "test-owner")
if err != nil {
t.Errorf("Got error while initializing lock owner %s", err)
}
time.Sleep(1 * time.Second)
}