From dec93d27a4ceb087430f63fe72fed2c9ecffe4ce Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Mon, 5 Oct 2020 22:04:20 +0530 Subject: [PATCH 01/13] this commit implements a basic structure for sessions --- go.mod | 1 + go.sum | 2 + internal/lockclient/client.go | 9 ++++- internal/lockclient/session/session.go | 18 +++++++++ internal/lockclient/session/simple_session.go | 37 +++++++++++++++++++ internal/lockclient/simple_client.go | 21 +++++++++++ internal/lockservice/simpleLockService.go | 6 +-- 7 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 internal/lockclient/session/session.go create mode 100644 internal/lockclient/session/simple_session.go diff --git a/go.mod b/go.mod index ae41604..a015840 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gorilla/mux v1.7.4 github.com/kr/pretty v0.1.0 // indirect + github.com/oklog/ulid v1.3.1 github.com/rs/zerolog v1.19.0 github.com/stretchr/testify v1.6.1 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/go.sum b/go.sum index f5cf236..d3239f6 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/internal/lockclient/client.go b/internal/lockclient/client.go index 82575f7..7a5afab 100644 --- a/internal/lockclient/client.go +++ b/internal/lockclient/client.go @@ -1,6 +1,9 @@ package lockclient -import "github.com/SystemBuilders/LocKey/internal/lockservice" +import ( + "github.com/SystemBuilders/LocKey/internal/lockclient/session" + "github.com/SystemBuilders/LocKey/internal/lockservice" +) // Client describes a client that can be used to interact with // the Lockey lockservice. The client can start the lockservice @@ -17,6 +20,10 @@ type Client interface { // to do so. Starting the service should be a non-blocking call // and return as soon as the server is started and setup. StartService(Config) error + // Connect allows the user process to establish a connection + // with the client. This returns an ID of the session that + // results from the connection. + Connect() session.Session // Acquire can be used to acquire a lock on Lockey. This // implementation interacts with the underlying server and // provides the service. diff --git a/internal/lockclient/session/session.go b/internal/lockclient/session/session.go new file mode 100644 index 0000000..b24846b --- /dev/null +++ b/internal/lockclient/session/session.go @@ -0,0 +1,18 @@ +package session + +import "github.com/oklog/ulid" + +// Session captures all necessary parameters necessary to +// describe a session with the lockservice in the lockclient. +type Session interface { + // SessionID is the unique ID that represents this session. + // This will be used in every transaction for validating the user. + SessionID() ulid.ULID + // ClientID is the ID of the client that will be created when + // the client is created. This acts as a second layer check along + // with the sessionID. + ClientID() ulid.ULID + // ProcessID the unique ID assigned for the process by the client. + // This will be the third layer check in the security mechanism. + ProcessID() ulid.ULID +} diff --git a/internal/lockclient/session/simple_session.go b/internal/lockclient/session/simple_session.go new file mode 100644 index 0000000..0e1823f --- /dev/null +++ b/internal/lockclient/session/simple_session.go @@ -0,0 +1,37 @@ +package session + +import ( + "github.com/oklog/ulid" +) + +var _ Session = (*SimpleSession)(nil) + +// SimpleSession implements a session. +type SimpleSession struct { + sessionID ulid.ULID + clientID ulid.ULID + processID ulid.ULID +} + +// SessionID returns the sessionID of the SimpleSession. +func (s *SimpleSession) SessionID() ulid.ULID { + return s.sessionID +} + +// ClientID returns the clientID of the SimpleSession. +func (s *SimpleSession) ClientID() ulid.ULID { + return s.clientID +} + +// ProcessID returns the processID of the SimpleSession +func (s *SimpleSession) ProcessID() ulid.ULID { + return s.processID +} + +func NewSession(sessionID, clientID, processID ulid.ULID) Session { + return &SimpleSession{ + sessionID: sessionID, + clientID: clientID, + processID: processID, + } +} diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 131823c..233cc7c 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -5,12 +5,16 @@ import ( "encoding/json" "errors" "io/ioutil" + "math/rand" "net/http" "strings" "sync" + "time" "github.com/SystemBuilders/LocKey/internal/cache" + "github.com/SystemBuilders/LocKey/internal/lockclient/session" "github.com/SystemBuilders/LocKey/internal/lockservice" + "github.com/oklog/ulid" ) var _ Config = (*lockservice.SimpleConfig)(nil) @@ -20,6 +24,7 @@ type SimpleClient struct { config *lockservice.SimpleConfig cache *cache.LRUCache mu sync.Mutex + id ulid.ULID } // NewSimpleClient returns a new SimpleClient of the given parameters. @@ -33,6 +38,16 @@ func NewSimpleClient(config *lockservice.SimpleConfig, cache *cache.LRUCache) *S var _ Client = (*SimpleClient)(nil) +// Connect lets the user process to establish a connection with the +// client. +func (sc *SimpleClient) Connect() session.Session { + sessionID := createUniqueID() + processID := createUniqueID() + session := session.NewSession(sessionID, sc.id, processID) + + return session +} + // Acquire allows the user process to acquire a lock. func (sc *SimpleClient) Acquire(d lockservice.Descriptors) error { return sc.acquire(d) @@ -219,3 +234,9 @@ func (sc *SimpleClient) releaseFromCache(d lockservice.Descriptors) error { } return cache.ErrCacheDoesntExist } + +func createUniqueID() ulid.ULID { + t := time.Unix(1000000, 0) + entropy := ulid.Monotonic(rand.New(rand.NewSource(t.UnixNano())), 0) + return ulid.MustNew(ulid.Timestamp(t), entropy) +} diff --git a/internal/lockservice/simpleLockService.go b/internal/lockservice/simpleLockService.go index bb61b5d..cafa2e2 100644 --- a/internal/lockservice/simpleLockService.go +++ b/internal/lockservice/simpleLockService.go @@ -20,13 +20,13 @@ type SimpleConfig struct { // LockRequest is an instance of a request for a lock. type LockRequest struct { - FileID string `json:"FileID"` - UserID string `json:"UserID"` + FileID string `json:"fileID"` + UserID string `json:"userID"` } // LockCheckRequest is an instance of a lock check request. type LockCheckRequest struct { - FileID string `json:"FileID"` + FileID string `json:"fileID"` } // CheckAcquireRes is the response of a Checkacquire. From eb89a513110470f85999c949e5b11e7e12e6c6ae Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Wed, 7 Oct 2020 12:36:41 +0530 Subject: [PATCH 02/13] this commit adds a part of support for sessions in the lock client: timer implementaion, splitting of domains for acquire and release, addition of context --- internal/lockclient/client.go | 4 +- internal/lockclient/errors.go | 13 ++++ internal/lockclient/simple_client.go | 105 +++++++++++++++++++++++---- 3 files changed, 107 insertions(+), 15 deletions(-) create mode 100644 internal/lockclient/errors.go diff --git a/internal/lockclient/client.go b/internal/lockclient/client.go index 7a5afab..2afad43 100644 --- a/internal/lockclient/client.go +++ b/internal/lockclient/client.go @@ -27,11 +27,11 @@ type Client interface { // Acquire can be used to acquire a lock on Lockey. This // implementation interacts with the underlying server and // provides the service. - Acquire(lockservice.Descriptors) error + Acquire(lockservice.Descriptors, session.Session) error // Release can be used to release a lock on Lockey. This // implementation interacts with the underlying server and // provides the service. - Release(lockservice.Descriptors) error + Release(lockservice.Descriptors, session.Session) error } // Config describes the configuration for the lockservice to run on. diff --git a/internal/lockclient/errors.go b/internal/lockclient/errors.go new file mode 100644 index 0000000..a69b178 --- /dev/null +++ b/internal/lockclient/errors.go @@ -0,0 +1,13 @@ +package lockclient + +// Error provides constant error strings to the driver functions. +type Error string + +func (e Error) Error() string { return string(e) } + +// Constant errors. +// Rule of thumb, all errors start with a small letter and end with no full stop. +const ( + ErrSessionInexistent = Error("the session related to this process doesn't exist") + SessionExpired = Error("session expired") +) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 233cc7c..30737fe 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -2,6 +2,7 @@ package lockclient import ( "bytes" + "context" "encoding/json" "errors" "io/ioutil" @@ -21,18 +22,26 @@ var _ Config = (*lockservice.SimpleConfig)(nil) // SimpleClient implements Client, the lockclient for LocKey. type SimpleClient struct { - config *lockservice.SimpleConfig - cache *cache.LRUCache - mu sync.Mutex - id ulid.ULID + config *lockservice.SimpleConfig + cache *cache.LRUCache + mu sync.Mutex + id ulid.ULID + sessions map[ulid.ULID]session.Session + sessionTimers map[ulid.ULID]chan struct{} } // NewSimpleClient returns a new SimpleClient of the given parameters. // This client works with or without the existance of a cache. func NewSimpleClient(config *lockservice.SimpleConfig, cache *cache.LRUCache) *SimpleClient { + clientID := createUniqueID() + sessions := make(map[ulid.ULID]session.Session) + sessionTimers := make(map[ulid.ULID]chan struct{}) return &SimpleClient{ - config: config, - cache: cache, + config: config, + cache: cache, + id: clientID, + sessions: sessions, + sessionTimers: sessionTimers, } } @@ -44,22 +53,56 @@ func (sc *SimpleClient) Connect() session.Session { sessionID := createUniqueID() processID := createUniqueID() session := session.NewSession(sessionID, sc.id, processID) - + sc.sessions[processID] = session + sc.startSession(processID) return session } // Acquire allows the user process to acquire a lock. -func (sc *SimpleClient) Acquire(d lockservice.Descriptors) error { - return sc.acquire(d) +// This returns a "session expired" error if the session expires when +// the lock is being acquired. +// +// All locks acquired during the session will be revoked if the session +// expires. +func (sc *SimpleClient) Acquire(d lockservice.Descriptors, s session.Session) error { + if _, ok := sc.sessions[s.ProcessID()]; !ok { + return ErrSessionInexistent + } + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go func() { + for { + select { + case <-sc.sessionTimers[s.ProcessID()]: + cancel() + sc.gracefulSessionShutDown(s.ProcessID()) + default: + } + } + }() + return sc.acquire(ctx, d) } // acquire makes an HTTP call to the lockserver and acquires the lock. // This function makes the acquire call and doesn't care about the contention // on the lock service. -// The errors involved may be due the HTTP errors or the lockservice errors. +// The errors involved may be due the HTTP, cache or the lockservice errors. // -// Currently acquire doesn't order the user processes that request for the lock. -func (sc *SimpleClient) acquire(d lockservice.Descriptors) error { +// This function doesn't care about sessions or ordering of the user processes and +// thus can be used for book-keeping purposes. +func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) (err error) { + + go func() { + for { + select { + case <-ctx.Done(): + err = SessionExpired + return + default: + } + } + }() + // Check for existance of a cache and check // if the element is in the cache. if sc.cache != nil { @@ -107,7 +150,22 @@ func (sc *SimpleClient) acquire(d lockservice.Descriptors) error { // Release makes an HTTP call to the lockserver and releases the lock. // The errors invloved may be due the HTTP errors or the lockservice errors. -func (sc *SimpleClient) Release(d lockservice.Descriptors) error { +// +// Only if there is an active session by the user process, it can release the locks +// once verified that the locks belong to the user process. +func (sc *SimpleClient) Release(d lockservice.Descriptors, s session.Session) error { + ctx := context.Background() + return sc.release(ctx, d) +} + +// release makes a HTTP call to the lock service and releases the lock. +// This function makes the release call and doesn't care about the contention +// on the lock service. +// The errors involved maybe the HTTP, cache or the lockservice errors. +// +// This function doesn't care about sessions or ordering of the user processes and +// thus can be used for book-keeping purposes. +func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) error { endPoint := sc.config.IPAddr + ":" + sc.config.PortAddr + "/release" data := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} requestJSON, err := json.Marshal(data) @@ -235,6 +293,27 @@ func (sc *SimpleClient) releaseFromCache(d lockservice.Descriptors) error { return cache.ErrCacheDoesntExist } +// startSession starts the session by initiating the timer for this user process. +// This is a non blocking function which runs on a different goroutine. It sends +// a signal through the "sessionTimers" map for the respective "processID" when +// the session timer ends. +// +// The function starts with creating a new channel, assigning it to the respective +// object in the map and then ends by closing the channel created. +func (sc *SimpleClient) startSession(processID ulid.ULID) { + go func(ulid.ULID) { + timerChan := make(chan struct{}, 1) + sc.sessionTimers[processID] = timerChan + time.Sleep(200 * time.Millisecond) + sc.sessionTimers[processID] <- struct{}{} + close(sc.sessionTimers[processID]) + }(processID) +} + +func (sc *SimpleClient) gracefulSessionShutDown(processID ulid.ULID) { + +} + func createUniqueID() ulid.ULID { t := time.Unix(1000000, 0) entropy := ulid.Monotonic(rand.New(rand.NewSource(t.UnixNano())), 0) From 635aa9f758bb90f3e45ee41304d22b009d8c8bb3 Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Thu, 8 Oct 2020 11:36:44 +0530 Subject: [PATCH 03/13] this commit implements all basic needs for a session implementation in the lock client: gracefulShutdown, maintaining list of current locks --- internal/lockclient/simple_client.go | 107 ++++++++++++++++++++++----- 1 file changed, 88 insertions(+), 19 deletions(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 30737fe..4b469d0 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -22,12 +22,20 @@ var _ Config = (*lockservice.SimpleConfig)(nil) // SimpleClient implements Client, the lockclient for LocKey. type SimpleClient struct { - config *lockservice.SimpleConfig - cache *cache.LRUCache - mu sync.Mutex - id ulid.ULID - sessions map[ulid.ULID]session.Session + config *lockservice.SimpleConfig + cache *cache.LRUCache + mu sync.Mutex + id ulid.ULID + + // sessions holds the mapping of a process to a session. + sessions map[ulid.ULID]session.Session + // sessionTimers maintains the timers for each session, sessionTimers map[ulid.ULID]chan struct{} + // sessionAcquisitions has a list of all the acquisitions + // from a particular process. This has no knowledge of + // whether the process owning the lock has an active session + // or not, this guarantee has to be ensured by the client. + sessionAcquisitions map[ulid.ULID][]lockservice.Descriptors } // NewSimpleClient returns a new SimpleClient of the given parameters. @@ -80,7 +88,15 @@ func (sc *SimpleClient) Acquire(d lockservice.Descriptors, s session.Session) er } } }() - return sc.acquire(ctx, d) + err := sc.acquire(ctx, d) + if err != nil { + return err + } + // Once the lock is guaranteed to be acquired, append it to the acquisitions list. + sc.mu.Lock() + sc.sessionAcquisitions[s.ProcessID()] = append(sc.sessionAcquisitions[s.ProcessID()], d) + sc.mu.Unlock() + return nil } // acquire makes an HTTP call to the lockserver and acquires the lock. @@ -89,19 +105,21 @@ func (sc *SimpleClient) Acquire(d lockservice.Descriptors, s session.Session) er // The errors involved may be due the HTTP, cache or the lockservice errors. // // This function doesn't care about sessions or ordering of the user processes and -// thus can be used for book-keeping purposes. +// thus can be used for book-keeping purposes using a nil context. func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) (err error) { - go func() { - for { - select { - case <-ctx.Done(): - err = SessionExpired - return - default: + if ctx != nil { + go func() { + for { + select { + case <-ctx.Done(): + err = SessionExpired + return + default: + } } - } - }() + }() + } // Check for existance of a cache and check // if the element is in the cache. @@ -154,8 +172,29 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) // Only if there is an active session by the user process, it can release the locks // once verified that the locks belong to the user process. func (sc *SimpleClient) Release(d lockservice.Descriptors, s session.Session) error { + if _, ok := sc.sessions[s.ProcessID()]; !ok { + return ErrSessionInexistent + } ctx := context.Background() - return sc.release(ctx, d) + ctx, cancel := context.WithCancel(ctx) + go func() { + for { + select { + case <-sc.sessionTimers[s.ProcessID()]: + cancel() + sc.gracefulSessionShutDown(s.ProcessID()) + default: + } + } + }() + + err := sc.release(ctx, d) + if err != nil { + return err + } + // Remove the descriptor that was released. + sc.removeFromSlice(s.ProcessID(), d) + return nil } // release makes a HTTP call to the lock service and releases the lock. @@ -164,8 +203,23 @@ func (sc *SimpleClient) Release(d lockservice.Descriptors, s session.Session) er // The errors involved maybe the HTTP, cache or the lockservice errors. // // This function doesn't care about sessions or ordering of the user processes and -// thus can be used for book-keeping purposes. -func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) error { +// thus can be used for book-keeping purposes using a nil context. +// TODO: Cache invalidation +func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) (err error) { + + if ctx != nil { + go func() { + for { + select { + case <-ctx.Done(): + err = SessionExpired + return + default: + } + } + }() + } + endPoint := sc.config.IPAddr + ":" + sc.config.PortAddr + "/release" data := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} requestJSON, err := json.Marshal(data) @@ -304,14 +358,29 @@ func (sc *SimpleClient) startSession(processID ulid.ULID) { go func(ulid.ULID) { timerChan := make(chan struct{}, 1) sc.sessionTimers[processID] = timerChan + // Sessions last for 200ms. time.Sleep(200 * time.Millisecond) sc.sessionTimers[processID] <- struct{}{} close(sc.sessionTimers[processID]) }(processID) } +// gracefulSessionShutdown releases all the locks in the lockservice once the +// session has ended. func (sc *SimpleClient) gracefulSessionShutDown(processID ulid.ULID) { + for i := range sc.sessionAcquisitions[processID] { + sc.release(nil, sc.sessionAcquisitions[processID][i]) + } +} +func (sc *SimpleClient) removeFromSlice(processID ulid.ULID, d lockservice.Descriptors) { + sc.mu.Lock() + for i := range sc.sessionAcquisitions[processID] { + if sc.sessionAcquisitions[processID][i] == d { + sc.sessionAcquisitions[processID] = append(sc.sessionAcquisitions[processID][:i], sc.sessionAcquisitions[processID][i+1:]...) + } + } + sc.mu.Unlock() } func createUniqueID() ulid.ULID { From 75711337a6dceade51647f331b1865f47e3a57b7 Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Sat, 10 Oct 2020 21:51:01 +0530 Subject: [PATCH 04/13] this commit aligns older tests to use sessions and moves to ObjectDescriptor and LockDescriptor separation, solving #29 --- internal/lockclient/client.go | 4 +- internal/lockclient/id/id.go | 64 ++++++++++ internal/lockclient/session/session.go | 8 +- internal/lockclient/session/simple_session.go | 17 +-- internal/lockclient/simple_client.go | 120 ++++++++++-------- internal/lockclient/simple_client_test.go | 81 +++++++----- internal/lockservice/lockservice.go | 5 + internal/lockservice/simpleLockService.go | 6 + 8 files changed, 202 insertions(+), 103 deletions(-) create mode 100644 internal/lockclient/id/id.go diff --git a/internal/lockclient/client.go b/internal/lockclient/client.go index 2afad43..024e686 100644 --- a/internal/lockclient/client.go +++ b/internal/lockclient/client.go @@ -27,11 +27,11 @@ type Client interface { // Acquire can be used to acquire a lock on Lockey. This // implementation interacts with the underlying server and // provides the service. - Acquire(lockservice.Descriptors, session.Session) error + Acquire(lockservice.Object, session.Session) error // Release can be used to release a lock on Lockey. This // implementation interacts with the underlying server and // provides the service. - Release(lockservice.Descriptors, session.Session) error + Release(lockservice.Object, session.Session) error } // Config describes the configuration for the lockservice to run on. diff --git a/internal/lockclient/id/id.go b/internal/lockclient/id/id.go new file mode 100644 index 0000000..5f2e952 --- /dev/null +++ b/internal/lockclient/id/id.go @@ -0,0 +1,64 @@ +package id + +import ( + "fmt" + "log" + "math/rand" + "sync" + "time" + + "github.com/oklog/ulid" +) + +// ID describes a general identifier. An ID has to be unique application-wide. +// IDs must not be re-used. +type ID interface { + fmt.Stringer + Bytes() []byte +} + +var _ ID = (*id)(nil) + +type id ulid.ULID + +var ( + lock sync.Mutex + randSource = rand.New(rand.NewSource(time.Now().UnixNano())) + entropy = ulid.Monotonic(randSource, 0) +) + +// Create creates a globally unique ID. This function is safe for concurrent +// use. +func Create() ID { + lock.Lock() + defer lock.Unlock() + + genID, err := ulid.New(ulid.Now(), entropy) + if err != nil { + // For this to happen, the random module would have to fail. Since we + // use Go's pseudo RNG, which just jumps around a few numbers, instead + // of using crypto/rand, and we also made this function safe for + // concurrent use, this is nearly impossible to happen. However, with + // the current version of oklog/ulid v1.3.1, this will also break after + // 2121-04-11 11:53:25.01172576 UTC. + log.Fatal(fmt.Errorf("new ulid: %w", err)) + } + return id(genID) +} + +// Parse parses an ID from a byte slice. +func Parse(idBytes []byte) (ID, error) { + parsed, err := ulid.Parse(string(idBytes)) + if err != nil { + return nil, fmt.Errorf("parse: %w", err) + } + return id(parsed), nil +} + +func (id id) String() string { + return ulid.ULID(id).String() +} + +func (id id) Bytes() []byte { + return []byte(id.String()) +} diff --git a/internal/lockclient/session/session.go b/internal/lockclient/session/session.go index b24846b..40ea12f 100644 --- a/internal/lockclient/session/session.go +++ b/internal/lockclient/session/session.go @@ -1,18 +1,18 @@ package session -import "github.com/oklog/ulid" +import "github.com/SystemBuilders/LocKey/internal/lockclient/id" // Session captures all necessary parameters necessary to // describe a session with the lockservice in the lockclient. type Session interface { // SessionID is the unique ID that represents this session. // This will be used in every transaction for validating the user. - SessionID() ulid.ULID + SessionID() id.ID // ClientID is the ID of the client that will be created when // the client is created. This acts as a second layer check along // with the sessionID. - ClientID() ulid.ULID + ClientID() id.ID // ProcessID the unique ID assigned for the process by the client. // This will be the third layer check in the security mechanism. - ProcessID() ulid.ULID + ProcessID() id.ID } diff --git a/internal/lockclient/session/simple_session.go b/internal/lockclient/session/simple_session.go index 0e1823f..01ea2c7 100644 --- a/internal/lockclient/session/simple_session.go +++ b/internal/lockclient/session/simple_session.go @@ -1,34 +1,35 @@ package session import ( - "github.com/oklog/ulid" + "github.com/SystemBuilders/LocKey/internal/lockclient/id" ) var _ Session = (*SimpleSession)(nil) // SimpleSession implements a session. type SimpleSession struct { - sessionID ulid.ULID - clientID ulid.ULID - processID ulid.ULID + sessionID id.ID + clientID id.ID + processID id.ID } // SessionID returns the sessionID of the SimpleSession. -func (s *SimpleSession) SessionID() ulid.ULID { +func (s *SimpleSession) SessionID() id.ID { return s.sessionID } // ClientID returns the clientID of the SimpleSession. -func (s *SimpleSession) ClientID() ulid.ULID { +func (s *SimpleSession) ClientID() id.ID { return s.clientID } // ProcessID returns the processID of the SimpleSession -func (s *SimpleSession) ProcessID() ulid.ULID { +func (s *SimpleSession) ProcessID() id.ID { return s.processID } -func NewSession(sessionID, clientID, processID ulid.ULID) Session { +// NewSession returns a new instance of a session with the given parameters. +func NewSession(sessionID, clientID, processID id.ID) Session { return &SimpleSession{ sessionID: sessionID, clientID: clientID, diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 50eeac6..f470655 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -6,16 +6,16 @@ import ( "encoding/json" "errors" "io/ioutil" - "math/rand" "net/http" "strings" "sync" "time" + "github.com/SystemBuilders/LocKey/internal/lockclient/id" + "github.com/SystemBuilders/LocKey/internal/lockclient/cache" "github.com/SystemBuilders/LocKey/internal/lockclient/session" "github.com/SystemBuilders/LocKey/internal/lockservice" - "github.com/oklog/ulid" ) var _ Config = (*lockservice.SimpleConfig)(nil) @@ -25,31 +25,33 @@ type SimpleClient struct { config *lockservice.SimpleConfig cache *cache.LRUCache mu sync.Mutex - id ulid.ULID + id id.ID // sessions holds the mapping of a process to a session. - sessions map[ulid.ULID]session.Session + sessions map[id.ID]session.Session // sessionTimers maintains the timers for each session, - sessionTimers map[ulid.ULID]chan struct{} + sessionTimers map[id.ID]chan struct{} // sessionAcquisitions has a list of all the acquisitions // from a particular process. This has no knowledge of // whether the process owning the lock has an active session // or not, this guarantee has to be ensured by the client. - sessionAcquisitions map[ulid.ULID][]lockservice.Descriptors + sessionAcquisitions map[id.ID][]lockservice.Descriptors } // NewSimpleClient returns a new SimpleClient of the given parameters. // This client works with or without the existance of a cache. func NewSimpleClient(config *lockservice.SimpleConfig, cache *cache.LRUCache) *SimpleClient { - clientID := createUniqueID() - sessions := make(map[ulid.ULID]session.Session) - sessionTimers := make(map[ulid.ULID]chan struct{}) + clientID := id.Create() + sessions := make(map[id.ID]session.Session) + sessionTimers := make(map[id.ID]chan struct{}) + sessionAcquisitions := make(map[id.ID][]lockservice.Descriptors) return &SimpleClient{ - config: config, - cache: cache, - id: clientID, - sessions: sessions, - sessionTimers: sessionTimers, + config: config, + cache: cache, + id: clientID, + sessions: sessions, + sessionTimers: sessionTimers, + sessionAcquisitions: sessionAcquisitions, } } @@ -58,8 +60,8 @@ var _ Client = (*SimpleClient)(nil) // Connect lets the user process to establish a connection with the // client. func (sc *SimpleClient) Connect() session.Session { - sessionID := createUniqueID() - processID := createUniqueID() + sessionID := id.Create() + processID := id.Create() session := session.NewSession(sessionID, sc.id, processID) sc.sessions[processID] = session sc.startSession(processID) @@ -72,7 +74,7 @@ func (sc *SimpleClient) Connect() session.Session { // // All locks acquired during the session will be revoked if the session // expires. -func (sc *SimpleClient) Acquire(d lockservice.Descriptors, s session.Session) error { +func (sc *SimpleClient) Acquire(d lockservice.Object, s session.Session) error { if _, ok := sc.sessions[s.ProcessID()]; !ok { return ErrSessionInexistent } @@ -80,21 +82,24 @@ func (sc *SimpleClient) Acquire(d lockservice.Descriptors, s session.Session) er ctx, cancel := context.WithCancel(ctx) go func() { for { + sc.mu.Lock() select { case <-sc.sessionTimers[s.ProcessID()]: cancel() sc.gracefulSessionShutDown(s.ProcessID()) default: } + sc.mu.Unlock() } }() - err := sc.acquire(ctx, d) + ld := lockservice.NewLockDescriptor(d.ID(), s.ProcessID().String()) + err := sc.acquire(ctx, ld) if err != nil { return err } // Once the lock is guaranteed to be acquired, append it to the acquisitions list. sc.mu.Lock() - sc.sessionAcquisitions[s.ProcessID()] = append(sc.sessionAcquisitions[s.ProcessID()], d) + sc.sessionAcquisitions[s.ProcessID()] = append(sc.sessionAcquisitions[s.ProcessID()], ld) sc.mu.Unlock() return nil } @@ -136,23 +141,28 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) endPoint := sc.config.IP() + ":" + sc.config.Port() + "/acquire" // Since the cache doesn't have the element, query the server. testData := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - requestJSON, err := json.Marshal(testData) + requestJSON, ok := json.Marshal(testData) + if ok != nil { + return ok + } - req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) - if err != nil { - return err + req, ok := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) + if ok != nil { + return ok } req.Header.Set("Content-Type", "application/json") client := &http.Client{} - resp, err := client.Do(req) - - if err != nil { - return err + resp, ok := client.Do(req) + if ok != nil { + return ok } defer resp.Body.Close() - body, _ := ioutil.ReadAll(resp.Body) + body, ok := ioutil.ReadAll(resp.Body) + if ok != nil { + return ok + } if resp.StatusCode != 200 { return errors.New(strings.TrimSpace(string(body))) } @@ -171,7 +181,7 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) // // Only if there is an active session by the user process, it can release the locks // once verified that the locks belong to the user process. -func (sc *SimpleClient) Release(d lockservice.Descriptors, s session.Session) error { +func (sc *SimpleClient) Release(d lockservice.Object, s session.Session) error { if _, ok := sc.sessions[s.ProcessID()]; !ok { return ErrSessionInexistent } @@ -179,21 +189,23 @@ func (sc *SimpleClient) Release(d lockservice.Descriptors, s session.Session) er ctx, cancel := context.WithCancel(ctx) go func() { for { + sc.mu.Lock() select { case <-sc.sessionTimers[s.ProcessID()]: cancel() sc.gracefulSessionShutDown(s.ProcessID()) default: } + sc.mu.Unlock() } }() - - err := sc.release(ctx, d) + ld := lockservice.NewLockDescriptor(d.ID(), s.ProcessID().String()) + err := sc.release(ctx, ld) if err != nil { return err } // Remove the descriptor that was released. - sc.removeFromSlice(s.ProcessID(), d) + sc.removeFromSlice(s.ProcessID(), ld) return nil } @@ -222,34 +234,34 @@ func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) endPoint := sc.config.IPAddr + ":" + sc.config.PortAddr + "/release" data := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - requestJSON, err := json.Marshal(data) - if err != nil { - return err + requestJSON, ok := json.Marshal(data) + if ok != nil { + return ok } - req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) - if err != nil { - return err + req, ok := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) + if ok != nil { + return ok } req.Header.Set("Content-Type", "application/json") client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return (err) + resp, ok := client.Do(req) + if ok != nil { + return ok } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err + body, ok := ioutil.ReadAll(resp.Body) + if ok != nil { + return ok } if resp.StatusCode != 200 { return lockservice.Error(strings.TrimSpace(string(body))) } if sc.cache != nil { - err = sc.releaseFromCache(d) + err := sc.releaseFromCache(d) if err != nil { return err } @@ -354,26 +366,30 @@ func (sc *SimpleClient) releaseFromCache(d lockservice.Descriptors) error { // // The function starts with creating a new channel, assigning it to the respective // object in the map and then ends by closing the channel created. -func (sc *SimpleClient) startSession(processID ulid.ULID) { - go func(ulid.ULID) { +func (sc *SimpleClient) startSession(processID id.ID) { + go func(id.ID) { timerChan := make(chan struct{}, 1) + sc.mu.Lock() sc.sessionTimers[processID] = timerChan + sc.mu.Unlock() // Sessions last for 200ms. time.Sleep(200 * time.Millisecond) + sc.mu.Lock() sc.sessionTimers[processID] <- struct{}{} + sc.mu.Unlock() close(sc.sessionTimers[processID]) }(processID) } // gracefulSessionShutdown releases all the locks in the lockservice once the // session has ended. -func (sc *SimpleClient) gracefulSessionShutDown(processID ulid.ULID) { +func (sc *SimpleClient) gracefulSessionShutDown(processID id.ID) { for i := range sc.sessionAcquisitions[processID] { sc.release(nil, sc.sessionAcquisitions[processID][i]) } } -func (sc *SimpleClient) removeFromSlice(processID ulid.ULID, d lockservice.Descriptors) { +func (sc *SimpleClient) removeFromSlice(processID id.ID, d lockservice.Descriptors) { sc.mu.Lock() for i := range sc.sessionAcquisitions[processID] { if sc.sessionAcquisitions[processID][i] == d { @@ -382,9 +398,3 @@ func (sc *SimpleClient) removeFromSlice(processID ulid.ULID, d lockservice.Descr } sc.mu.Unlock() } - -func createUniqueID() ulid.ULID { - t := time.Unix(1000000, 0) - entropy := ulid.Monotonic(rand.New(rand.NewSource(t.UnixNano())), 0) - return ulid.MustNew(ulid.Timestamp(t), entropy) -} diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index adeed53..aa1c6c1 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -31,40 +31,43 @@ func TestLockService(t *testing.T) { } }() - // Server takes some time to start + // Server takes some time to start. time.Sleep(100 * time.Millisecond) + + // Flow of creating a client and acquiring a lock: + // 1. Create a cache for the client. + // 2. Create a client and plug in the created cache. + // 3. Connect to the said client and hold on to the session value. + // 4. Use the session as a key for all further transactions. t.Run("acquire test release test", func(t *testing.T) { size := 5 cache := cache.NewLRUCache(size) sc := NewSimpleClient(scfg, cache) - d := lockservice.NewLockDescriptor("test", "owner") + session := sc.Connect() + + d := lockservice.NewObjectDescriptor("test") - got := sc.Acquire(d) + got := sc.Acquire(d, session) var want error if got != want { t.Errorf("acquire: got %q want %q", got, want) } - d = lockservice.NewLockDescriptor("test1", "owner") - - got = sc.Acquire(d) + d = lockservice.NewObjectDescriptor("test1") + got = sc.Acquire(d, session) if got != want { t.Errorf("acquire: got %q want %q", got, want) } - d = lockservice.NewLockDescriptor("test", "owner") - - got = sc.Release(d) - + d = lockservice.NewObjectDescriptor("test") + got = sc.Release(d, session) if got != want { t.Errorf("release: got %q want %q", got, want) } - d = lockservice.NewLockDescriptor("test1", "owner") - - got = sc.Release(d) - + d = lockservice.NewObjectDescriptor("test1") + got = sc.Release(d, session) if got != want { t.Errorf("release: got %q want %q", got, want) } @@ -75,23 +78,23 @@ func TestLockService(t *testing.T) { cache := cache.NewLRUCache(size) sc := NewSimpleClient(scfg, cache) - d := lockservice.NewLockDescriptor("test", "owner") + session := sc.Connect() + d := lockservice.NewObjectDescriptor("test") - got := sc.Acquire(d) + got := sc.Acquire(d, session) var want error if got != want { t.Errorf("acquire: got %q want %q", got, want) } - got = sc.Acquire(d) + session2 := sc.Connect() + got = sc.Acquire(d, session2) want = lockservice.ErrFileacquired if got.Error() != want.Error() { t.Errorf("acquire: got %q want %q", got, want) } - d = lockservice.NewLockDescriptor("test", "owner") - - got = sc.Release(d) + got = sc.Release(d, session) want = nil if got != want { t.Errorf("release: got %q want %q", got, want) @@ -103,29 +106,37 @@ func TestLockService(t *testing.T) { cache := cache.NewLRUCache(size) sc := NewSimpleClient(scfg, cache) - d := lockservice.NewLockDescriptor("test", "owner1") - got := sc.Acquire(d) + session := sc.Connect() + d := lockservice.NewObjectDescriptor("test") + got := sc.Acquire(d, session) var want error if got != want { t.Errorf("acquire: got %q want %q", got, want) } - d = lockservice.NewLockDescriptor("test", "owner2") - got = sc.Release(d) + session2 := sc.Connect() + got = sc.Release(d, session2) want = lockservice.ErrUnauthorizedAccess if got != want { t.Errorf("acquire: got %v want %v", got, want) } - d = lockservice.NewLockDescriptor("test2", "owner1") - got = sc.Acquire(d) + d = lockservice.NewObjectDescriptor("test2") + got = sc.Acquire(d, session) want = nil if got != want { t.Errorf("acquire: got %q want %q", got, want) } - d = lockservice.NewLockDescriptor("test", "owner1") - got = sc.Release(d) + d = lockservice.NewObjectDescriptor("test") + got = sc.Release(d, session) + want = nil + if got != want { + t.Errorf("release: got %q want %q", got, want) + } + + d = lockservice.NewObjectDescriptor("test2") + got = sc.Release(d, session) want = nil if got != want { t.Errorf("release: got %q want %q", got, want) @@ -158,15 +169,16 @@ func BenchmarkLocKeyWithoutCache(b *testing.B) { time.Sleep(100 * time.Millisecond) sc := NewSimpleClient(scfg, nil) - d := lockservice.NewLockDescriptor("test", "owner") + session := sc.Connect() + d := lockservice.NewObjectDescriptor("test") for n := 0; n < b.N; n++ { - got := sc.acquire(d) + got := sc.Acquire(d, session) var want error if got != want { b.Errorf("acquire: got %q want %q", got, want) } - got = sc.Release(d) + got = sc.Release(d, session) if got != want { b.Errorf("release: got %q want %q", got, want) } @@ -197,15 +209,16 @@ func BenchmarkLocKeyWithCache(b *testing.B) { size := 5 cache := cache.NewLRUCache(size) sc := NewSimpleClient(scfg, cache) - d := lockservice.NewLockDescriptor("test", "owner") + session := sc.Connect() + d := lockservice.NewObjectDescriptor("test") for n := 0; n < b.N; n++ { - got := sc.acquire(d) + got := sc.Acquire(d, session) var want error if got != want { b.Errorf("acquire: got %q want %q", got, want) } - got = sc.Release(d) + got = sc.Release(d, session) if got != want { b.Errorf("release: got %q want %q", got, want) } diff --git a/internal/lockservice/lockservice.go b/internal/lockservice/lockservice.go index d21959f..76e823a 100644 --- a/internal/lockservice/lockservice.go +++ b/internal/lockservice/lockservice.go @@ -26,3 +26,8 @@ type Descriptors interface { ID() string Owner() string } + +// Object describes any object that can be used with the lockservice. +type Object interface { + ID() string +} diff --git a/internal/lockservice/simpleLockService.go b/internal/lockservice/simpleLockService.go index cafa2e2..dc2fa9f 100644 --- a/internal/lockservice/simpleLockService.go +++ b/internal/lockservice/simpleLockService.go @@ -55,6 +55,7 @@ type SimpleLockService struct { } var _ Descriptors = (*LockDescriptor)(nil) +var _ Object = (*ObjectDescriptor)(nil) // ObjectDescriptor describes the object that is subjected to // lock operations. @@ -62,6 +63,11 @@ type ObjectDescriptor struct { ObjectID string } +// ID returns the ID related to the object. +func (od *ObjectDescriptor) ID() string { + return od.ObjectID +} + // LockDescriptor implements the Descriptors interface. // Many descriptors can be added to this struct and the ID // can be a combination of all those descriptors. From f3e07d783bc640db241ff75465b44c5a5c022ed5 Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Sun, 11 Oct 2020 14:27:24 +0530 Subject: [PATCH 05/13] this commit fixes a race condition when session times out --- internal/lockclient/simple_client.go | 198 +++++++++++++++++---------- 1 file changed, 122 insertions(+), 76 deletions(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index f470655..e13e893 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -111,69 +111,94 @@ func (sc *SimpleClient) Acquire(d lockservice.Object, s session.Session) error { // // This function doesn't care about sessions or ordering of the user processes and // thus can be used for book-keeping purposes using a nil context. -func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) (err error) { +func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) error { + errChan := make(chan error, 1) if ctx != nil { go func() { for { select { case <-ctx.Done(): - err = SessionExpired - return + errChan <- SessionExpired default: } } }() } - // Check for existance of a cache and check - // if the element is in the cache. - if sc.cache != nil { - _, err := sc.getFromCache(lockservice.ObjectDescriptor{ObjectID: d.ID()}) - // Since there can be cache errors, we have this double check. - // We need to exit if a cache doesn't exist but proceed if the cache - // failed in persisting this element. - if err != nil && err != lockservice.ErrCheckAcquireFailure { - return err + var wg sync.WaitGroup + wg.Add(1) + go func() { + // Check for existance of a cache and check + // if the element is in the cache. + if sc.cache != nil { + _, err := sc.getFromCache(lockservice.ObjectDescriptor{ObjectID: d.ID()}) + // Since there can be cache errors, we have this double check. + // We need to exit if a cache doesn't exist but proceed if the cache + // failed in persisting this element. + if err != nil && err != lockservice.ErrCheckAcquireFailure { + errChan <- err + wg.Done() + return + } } - } - - endPoint := sc.config.IP() + ":" + sc.config.Port() + "/acquire" - // Since the cache doesn't have the element, query the server. - testData := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - requestJSON, ok := json.Marshal(testData) - if ok != nil { - return ok - } - req, ok := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) - if ok != nil { - return ok - } - req.Header.Set("Content-Type", "application/json") + endPoint := sc.config.IP() + ":" + sc.config.Port() + "/acquire" + // Since the cache doesn't have the element, query the server. + testData := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} + requestJSON, err := json.Marshal(testData) + if err != nil { + errChan <- err + wg.Done() + return + } - client := &http.Client{} - resp, ok := client.Do(req) - if ok != nil { - return ok - } - defer resp.Body.Close() + req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) + if err != nil { + errChan <- err + wg.Done() + return + } + req.Header.Set("Content-Type", "application/json") - body, ok := ioutil.ReadAll(resp.Body) - if ok != nil { - return ok - } - if resp.StatusCode != 200 { - return errors.New(strings.TrimSpace(string(body))) - } + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + errChan <- err + wg.Done() + return + } + defer resp.Body.Close() - if sc.cache != nil { - err := sc.addToCache(d) + body, err := ioutil.ReadAll(resp.Body) if err != nil { - return err + errChan <- err + wg.Done() + return } - } - return nil + if resp.StatusCode != 200 { + errChan <- errors.New(strings.TrimSpace(string(body))) + wg.Done() + return + } + + if sc.cache != nil { + err := sc.addToCache(d) + if err != nil { + errChan <- err + wg.Done() + return + } + } + wg.Done() + }() + + go func() { + wg.Wait() + errChan <- nil + }() + + return <-errChan } // Release makes an HTTP call to the lockserver and releases the lock. @@ -219,54 +244,75 @@ func (sc *SimpleClient) Release(d lockservice.Object, s session.Session) error { // TODO: Cache invalidation func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) (err error) { + errChan := make(chan error, 1) if ctx != nil { go func() { for { select { case <-ctx.Done(): - err = SessionExpired - return + errChan <- SessionExpired default: } } }() } - endPoint := sc.config.IPAddr + ":" + sc.config.PortAddr + "/release" - data := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - requestJSON, ok := json.Marshal(data) - if ok != nil { - return ok - } - - req, ok := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) - if ok != nil { - return ok - } - req.Header.Set("Content-Type", "application/json") + var wg sync.WaitGroup + wg.Add(1) + go func() { + endPoint := sc.config.IPAddr + ":" + sc.config.PortAddr + "/release" + data := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} + requestJSON, err := json.Marshal(data) + if err != nil { + errChan <- err + wg.Done() + return + } - client := &http.Client{} - resp, ok := client.Do(req) - if ok != nil { - return ok - } - defer resp.Body.Close() + req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) + if err != nil { + errChan <- err + wg.Done() + return + } + req.Header.Set("Content-Type", "application/json") - body, ok := ioutil.ReadAll(resp.Body) - if ok != nil { - return ok - } - if resp.StatusCode != 200 { - return lockservice.Error(strings.TrimSpace(string(body))) - } + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + errChan <- err + wg.Done() + return + } + defer resp.Body.Close() - if sc.cache != nil { - err := sc.releaseFromCache(d) + body, err := ioutil.ReadAll(resp.Body) if err != nil { - return err + errChan <- err + wg.Done() + return } - } - return nil + if resp.StatusCode != 200 { + errChan <- lockservice.Error(strings.TrimSpace(string(body))) + } + + if sc.cache != nil { + err := sc.releaseFromCache(d) + if err != nil { + errChan <- err + wg.Done() + return + } + } + wg.Done() + }() + + go func() { + wg.Wait() + errChan <- nil + }() + + return <-errChan } // StartService starts the lockservice LocKey. From 229bca21fd3199d17b2b15809bbe24c41f28917d Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Tue, 13 Oct 2020 15:40:48 +0530 Subject: [PATCH 06/13] this commit completes implementation of #24 --- internal/lockclient/cache/lru_cache.go | 4 ++ internal/lockclient/errors.go | 2 +- internal/lockclient/simple_client.go | 86 ++++++++++++++++++----- internal/lockclient/simple_client_test.go | 30 ++++++-- 4 files changed, 97 insertions(+), 25 deletions(-) diff --git a/internal/lockclient/cache/lru_cache.go b/internal/lockclient/cache/lru_cache.go index c908ab5..8983a5b 100644 --- a/internal/lockclient/cache/lru_cache.go +++ b/internal/lockclient/cache/lru_cache.go @@ -213,9 +213,13 @@ func (lru *LRUCache) deleteElementFromMap(key interface{}) error { } return nil } + +// printMap prints the LRU map and is concurrency safe. func (lru *LRUCache) printMap() { + lru.mu.Lock() for k, v := range lru.m { fmt.Printf("Key: %v, Value: ", k) fmt.Printf("LN: %v, RN: %v, NodeKey: %v\n", v.Left(), v.Right(), v.Key()) } + lru.mu.Unlock() } diff --git a/internal/lockclient/errors.go b/internal/lockclient/errors.go index a69b178..5a7a371 100644 --- a/internal/lockclient/errors.go +++ b/internal/lockclient/errors.go @@ -9,5 +9,5 @@ func (e Error) Error() string { return string(e) } // Rule of thumb, all errors start with a small letter and end with no full stop. const ( ErrSessionInexistent = Error("the session related to this process doesn't exist") - SessionExpired = Error("session expired") + ErrSessionExpired = Error("session expired") ) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index e13e893..b4cb2c8 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -12,6 +12,7 @@ import ( "time" "github.com/SystemBuilders/LocKey/internal/lockclient/id" + "github.com/rs/zerolog" "github.com/SystemBuilders/LocKey/internal/lockclient/cache" "github.com/SystemBuilders/LocKey/internal/lockclient/session" @@ -26,6 +27,7 @@ type SimpleClient struct { cache *cache.LRUCache mu sync.Mutex id id.ID + log zerolog.Logger // sessions holds the mapping of a process to a session. sessions map[id.ID]session.Session @@ -40,7 +42,7 @@ type SimpleClient struct { // NewSimpleClient returns a new SimpleClient of the given parameters. // This client works with or without the existance of a cache. -func NewSimpleClient(config *lockservice.SimpleConfig, cache *cache.LRUCache) *SimpleClient { +func NewSimpleClient(config *lockservice.SimpleConfig, log zerolog.Logger, cache *cache.LRUCache) *SimpleClient { clientID := id.Create() sessions := make(map[id.ID]session.Session) sessionTimers := make(map[id.ID]chan struct{}) @@ -49,6 +51,7 @@ func NewSimpleClient(config *lockservice.SimpleConfig, cache *cache.LRUCache) *S config: config, cache: cache, id: clientID, + log: log, sessions: sessions, sessionTimers: sessionTimers, sessionAcquisitions: sessionAcquisitions, @@ -65,6 +68,10 @@ func (sc *SimpleClient) Connect() session.Session { session := session.NewSession(sessionID, sc.id, processID) sc.sessions[processID] = session sc.startSession(processID) + sc.log. + Debug(). + Str(processID.String(), "connected"). + Msg("session created") return session } @@ -75,21 +82,34 @@ func (sc *SimpleClient) Connect() session.Session { // All locks acquired during the session will be revoked if the session // expires. func (sc *SimpleClient) Acquire(d lockservice.Object, s session.Session) error { + sc.mu.Lock() if _, ok := sc.sessions[s.ProcessID()]; !ok { + sc.mu.Unlock() return ErrSessionInexistent } + sc.mu.Unlock() ctx := context.Background() ctx, cancel := context.WithCancel(ctx) + close := make(chan struct{}, 1) go func() { for { sc.mu.Lock() select { case <-sc.sessionTimers[s.ProcessID()]: cancel() + sc.log. + Debug(). + Str(s.ProcessID().String(), "user process"). + Msg("session expired, starting graceful shutdown") + sc.mu.Unlock() sc.gracefulSessionShutDown(s.ProcessID()) + return + case <-close: + sc.mu.Unlock() + return default: + sc.mu.Unlock() } - sc.mu.Unlock() } }() ld := lockservice.NewLockDescriptor(d.ID(), s.ProcessID().String()) @@ -101,6 +121,7 @@ func (sc *SimpleClient) Acquire(d lockservice.Object, s session.Session) error { sc.mu.Lock() sc.sessionAcquisitions[s.ProcessID()] = append(sc.sessionAcquisitions[s.ProcessID()], ld) sc.mu.Unlock() + close <- struct{}{} return nil } @@ -111,6 +132,9 @@ func (sc *SimpleClient) Acquire(d lockservice.Object, s session.Session) error { // // This function doesn't care about sessions or ordering of the user processes and // thus can be used for book-keeping purposes using a nil context. +// +// To avoid a race condition by returning errors from the goroutine and the +// acquire functionality, a channel is used to capture the errors. func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) error { errChan := make(chan error, 1) @@ -119,7 +143,7 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) for { select { case <-ctx.Done(): - errChan <- SessionExpired + errChan <- ErrSessionExpired default: } } @@ -149,14 +173,13 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) requestJSON, err := json.Marshal(testData) if err != nil { errChan <- err - wg.Done() + // wg.Done() return } req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) if err != nil { errChan <- err - wg.Done() return } req.Header.Set("Content-Type", "application/json") @@ -165,7 +188,6 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) resp, err := client.Do(req) if err != nil { errChan <- err - wg.Done() return } defer resp.Body.Close() @@ -173,12 +195,10 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) body, err := ioutil.ReadAll(resp.Body) if err != nil { errChan <- err - wg.Done() return } if resp.StatusCode != 200 { errChan <- errors.New(strings.TrimSpace(string(body))) - wg.Done() return } @@ -186,7 +206,6 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) err := sc.addToCache(d) if err != nil { errChan <- err - wg.Done() return } } @@ -194,6 +213,9 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) }() go func() { + // This goroutine waits to check whether the acquire goroutine is + // done. Once it returned, a nil is passed to the channel indicating + // an error free process. wg.Wait() errChan <- nil }() @@ -207,21 +229,34 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) // Only if there is an active session by the user process, it can release the locks // once verified that the locks belong to the user process. func (sc *SimpleClient) Release(d lockservice.Object, s session.Session) error { + sc.mu.Lock() if _, ok := sc.sessions[s.ProcessID()]; !ok { + sc.mu.Unlock() return ErrSessionInexistent } + sc.mu.Unlock() ctx := context.Background() ctx, cancel := context.WithCancel(ctx) + close := make(chan struct{}, 1) go func() { for { sc.mu.Lock() select { case <-sc.sessionTimers[s.ProcessID()]: cancel() + sc.log. + Debug(). + Str(s.ProcessID().String(), "user process"). + Msg("session expired, starting graceful shutdown") + sc.mu.Unlock() sc.gracefulSessionShutDown(s.ProcessID()) + return + case <-close: + sc.mu.Unlock() + return default: + sc.mu.Unlock() } - sc.mu.Unlock() } }() ld := lockservice.NewLockDescriptor(d.ID(), s.ProcessID().String()) @@ -231,6 +266,7 @@ func (sc *SimpleClient) Release(d lockservice.Object, s session.Session) error { } // Remove the descriptor that was released. sc.removeFromSlice(s.ProcessID(), ld) + close <- struct{}{} return nil } @@ -250,7 +286,7 @@ func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) for { select { case <-ctx.Done(): - errChan <- SessionExpired + errChan <- ErrSessionExpired default: } } @@ -265,14 +301,12 @@ func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) requestJSON, err := json.Marshal(data) if err != nil { errChan <- err - wg.Done() return } req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) if err != nil { errChan <- err - wg.Done() return } req.Header.Set("Content-Type", "application/json") @@ -281,7 +315,6 @@ func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) resp, err := client.Do(req) if err != nil { errChan <- err - wg.Done() return } defer resp.Body.Close() @@ -289,18 +322,16 @@ func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) body, err := ioutil.ReadAll(resp.Body) if err != nil { errChan <- err - wg.Done() return } if resp.StatusCode != 200 { errChan <- lockservice.Error(strings.TrimSpace(string(body))) + return } if sc.cache != nil { - err := sc.releaseFromCache(d) if err != nil { errChan <- err - wg.Done() return } } @@ -414,6 +445,9 @@ func (sc *SimpleClient) releaseFromCache(d lockservice.Descriptors) error { // object in the map and then ends by closing the channel created. func (sc *SimpleClient) startSession(processID id.ID) { go func(id.ID) { + sc.log.Debug(). + Str(processID.String(), "user process"). + Msg("session timer started") timerChan := make(chan struct{}, 1) sc.mu.Lock() sc.sessionTimers[processID] = timerChan @@ -424,15 +458,29 @@ func (sc *SimpleClient) startSession(processID id.ID) { sc.sessionTimers[processID] <- struct{}{} sc.mu.Unlock() close(sc.sessionTimers[processID]) + sc.mu.Lock() + delete(sc.sessionTimers, processID) + sc.mu.Unlock() + sc.log.Debug(). + Str(processID.String(), "session timed out"). + Msg("disconnected") + sc.gracefulSessionShutDown(processID) }(processID) } // gracefulSessionShutdown releases all the locks in the lockservice once the // session has ended. func (sc *SimpleClient) gracefulSessionShutDown(processID id.ID) { - for i := range sc.sessionAcquisitions[processID] { - sc.release(nil, sc.sessionAcquisitions[processID][i]) + sc.mu.Lock() + var sessionAcquisitons = sc.sessionAcquisitions[processID] + sc.mu.Unlock() + for i := range sessionAcquisitons { + sc.release(nil, sessionAcquisitons[i]) } + sc.mu.Lock() + delete(sc.sessions, processID) + delete(sc.sessionAcquisitions, processID) + sc.mu.Unlock() } func (sc *SimpleClient) removeFromSlice(processID id.ID, d lockservice.Descriptors) { diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index aa1c6c1..2543e5a 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -42,7 +42,7 @@ func TestLockService(t *testing.T) { t.Run("acquire test release test", func(t *testing.T) { size := 5 cache := cache.NewLRUCache(size) - sc := NewSimpleClient(scfg, cache) + sc := NewSimpleClient(scfg, log, cache) session := sc.Connect() @@ -76,7 +76,7 @@ func TestLockService(t *testing.T) { t.Run("acquire test, acquire test, release test", func(t *testing.T) { size := 5 cache := cache.NewLRUCache(size) - sc := NewSimpleClient(scfg, cache) + sc := NewSimpleClient(scfg, log, cache) session := sc.Connect() d := lockservice.NewObjectDescriptor("test") @@ -104,7 +104,7 @@ func TestLockService(t *testing.T) { t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { size := 2 cache := cache.NewLRUCache(size) - sc := NewSimpleClient(scfg, cache) + sc := NewSimpleClient(scfg, log, cache) session := sc.Connect() d := lockservice.NewObjectDescriptor("test") @@ -143,6 +143,26 @@ func TestLockService(t *testing.T) { } }) + t.Run("acquire test and release after session expiry", func(t *testing.T) { + sc := NewSimpleClient(scfg, log, nil) + session := sc.Connect() + d := lockservice.NewObjectDescriptor("test3") + + got := sc.Acquire(d, session) + var want error + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + // Wait for the session to expire + time.Sleep(500 * time.Millisecond) + got = sc.Release(d, session) + want = ErrSessionInexistent + if got != want { + t.Errorf("release: got %q want %q", got, want) + } + }) + quit <- true return } @@ -168,7 +188,7 @@ func BenchmarkLocKeyWithoutCache(b *testing.B) { }() time.Sleep(100 * time.Millisecond) - sc := NewSimpleClient(scfg, nil) + sc := NewSimpleClient(scfg, log, nil) session := sc.Connect() d := lockservice.NewObjectDescriptor("test") for n := 0; n < b.N; n++ { @@ -208,7 +228,7 @@ func BenchmarkLocKeyWithCache(b *testing.B) { size := 5 cache := cache.NewLRUCache(size) - sc := NewSimpleClient(scfg, cache) + sc := NewSimpleClient(scfg, log, cache) session := sc.Connect() d := lockservice.NewObjectDescriptor("test") for n := 0; n < b.N; n++ { From 14f02167fc12d86e92d6ad7ff172e1061be40331 Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Tue, 13 Oct 2020 15:41:48 +0530 Subject: [PATCH 07/13] this commit completes implementation of #24 --- internal/lockclient/simple_client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index b4cb2c8..8532837 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -162,7 +162,6 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) // failed in persisting this element. if err != nil && err != lockservice.ErrCheckAcquireFailure { errChan <- err - wg.Done() return } } @@ -173,7 +172,6 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) requestJSON, err := json.Marshal(testData) if err != nil { errChan <- err - // wg.Done() return } From 55c929003e833dad58488c42698bccc222fe8867 Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Tue, 13 Oct 2020 15:46:39 +0530 Subject: [PATCH 08/13] this commit fixes a possible race condition --- internal/lockclient/simple_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 8532837..8ae6281 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -452,13 +452,13 @@ func (sc *SimpleClient) startSession(processID id.ID) { sc.mu.Unlock() // Sessions last for 200ms. time.Sleep(200 * time.Millisecond) + sc.mu.Lock() sc.sessionTimers[processID] <- struct{}{} - sc.mu.Unlock() close(sc.sessionTimers[processID]) - sc.mu.Lock() delete(sc.sessionTimers, processID) sc.mu.Unlock() + sc.log.Debug(). Str(processID.String(), "session timed out"). Msg("disconnected") From 03ac63f05155aa3d90ef2fd5caaf93e7ddc36dde Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Tue, 13 Oct 2020 19:53:38 +0530 Subject: [PATCH 09/13] this commit removes the unnecessary wait group --- internal/lockclient/simple_client.go | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 8ae6281..7ca8db4 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -150,8 +150,6 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) }() } - var wg sync.WaitGroup - wg.Add(1) go func() { // Check for existance of a cache and check // if the element is in the cache. @@ -207,14 +205,6 @@ func (sc *SimpleClient) acquire(ctx context.Context, d lockservice.Descriptors) return } } - wg.Done() - }() - - go func() { - // This goroutine waits to check whether the acquire goroutine is - // done. Once it returned, a nil is passed to the channel indicating - // an error free process. - wg.Wait() errChan <- nil }() @@ -285,14 +275,13 @@ func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) select { case <-ctx.Done(): errChan <- ErrSessionExpired + return default: } } }() } - var wg sync.WaitGroup - wg.Add(1) go func() { endPoint := sc.config.IPAddr + ":" + sc.config.PortAddr + "/release" data := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} @@ -333,11 +322,6 @@ func (sc *SimpleClient) release(ctx context.Context, d lockservice.Descriptors) return } } - wg.Done() - }() - - go func() { - wg.Wait() errChan <- nil }() From 9f638235e0273c41a16f6792b55627f8b93a2f59 Mon Sep 17 00:00:00 2001 From: Sumukha Pk Date: Sun, 8 Nov 2020 08:46:26 +0530 Subject: [PATCH 10/13] Update internal/lockclient/simple_client.go --- internal/lockclient/simple_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 7ca8db4..f504ea3 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -85,7 +85,7 @@ func (sc *SimpleClient) Acquire(d lockservice.Object, s session.Session) error { sc.mu.Lock() if _, ok := sc.sessions[s.ProcessID()]; !ok { sc.mu.Unlock() - return ErrSessionInexistent + return ErrSessionNonExistent } sc.mu.Unlock() ctx := context.Background() From de42353e1726be2a13ba2dbb35755007ce365e62 Mon Sep 17 00:00:00 2001 From: Sumukha Pk Date: Sun, 8 Nov 2020 08:46:40 +0530 Subject: [PATCH 11/13] Update internal/lockclient/errors.go --- internal/lockclient/errors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/lockclient/errors.go b/internal/lockclient/errors.go index 5a7a371..025a208 100644 --- a/internal/lockclient/errors.go +++ b/internal/lockclient/errors.go @@ -8,6 +8,6 @@ func (e Error) Error() string { return string(e) } // Constant errors. // Rule of thumb, all errors start with a small letter and end with no full stop. const ( - ErrSessionInexistent = Error("the session related to this process doesn't exist") + ErrSessionNonExistent = Error("the session related to this process doesn't exist") ErrSessionExpired = Error("session expired") ) From 61736c754c8ffff2e82ecdfd2e70646514899bce Mon Sep 17 00:00:00 2001 From: Sumukha Pk Date: Sun, 8 Nov 2020 08:46:50 +0530 Subject: [PATCH 12/13] Update internal/lockclient/simple_client.go --- internal/lockclient/simple_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index f504ea3..411e997 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -220,7 +220,7 @@ func (sc *SimpleClient) Release(d lockservice.Object, s session.Session) error { sc.mu.Lock() if _, ok := sc.sessions[s.ProcessID()]; !ok { sc.mu.Unlock() - return ErrSessionInexistent + return ErrSessionNonExistent } sc.mu.Unlock() ctx := context.Background() From 48ef72bd7dca386c734f4f9cb79530dbcf0a4208 Mon Sep 17 00:00:00 2001 From: Sumukha Pk Date: Sun, 8 Nov 2020 08:49:01 +0530 Subject: [PATCH 13/13] Update internal/lockclient/simple_client_test.go --- internal/lockclient/simple_client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index 2543e5a..ef5e36d 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -157,7 +157,7 @@ func TestLockService(t *testing.T) { // Wait for the session to expire time.Sleep(500 * time.Millisecond) got = sc.Release(d, session) - want = ErrSessionInexistent + want = ErrSessionNonExistent if got != want { t.Errorf("release: got %q want %q", got, want) }