From c470c7634b0c604a38bdab4075e24bc411d98f07 Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 14:23:41 -0400 Subject: [PATCH 1/5] adding retry for contested locks and an operation timeout --- cacheaside.go | 66 ++++++++++++++++++++++++++++++++++------------ cacheaside_test.go | 30 +++++++++++++++++++++ 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/cacheaside.go b/cacheaside.go index b178198..9675a26 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -17,22 +17,30 @@ import ( ) type CacheAside struct { - client rueidis.Client - locks syncx.Map[string, chan struct{}] - lockTTL time.Duration + client rueidis.Client + locks syncx.Map[string, chan struct{}] + lockTTL time.Duration + operationTTL time.Duration } type CacheAsideOption struct { LockTTL time.Duration + // OperationTTL is the maximum time to wait for cache operations to complete. + // If zero, defaults to 10 seconds. This is separate from cache entry TTL. + OperationTTL time.Duration } func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOption) (*CacheAside, error) { if caOption.LockTTL == 0 { caOption.LockTTL = 10 * time.Second } + if caOption.OperationTTL == 0 { + caOption.OperationTTL = 10 * time.Second + } rca := &CacheAside{ - lockTTL: caOption.LockTTL, + lockTTL: caOption.LockTTL, + operationTTL: caOption.OperationTTL, } clientOption.OnInvalidations = rca.onInvalidate client, err := rueidis.NewClient(clientOption) @@ -60,8 +68,9 @@ func (rca *CacheAside) onInvalidate(messages []rueidis.RedisMessage) { const prefix = "redcache:" var ( - delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) - setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) + delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) + setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) + errLockWaitTimeout = errors.New("lock wait timeout - retrying") ) func (rca *CacheAside) register(key string) <-chan struct{} { @@ -75,7 +84,7 @@ func (rca *CacheAside) Get( key string, fn func(ctx context.Context, key string) (val string, err error), ) (string, error) { - ctx, cancel := context.WithTimeout(ctx, ttl) + ctx, cancel := context.WithTimeout(ctx, rca.operationTTL) defer cancel() retry: wait := rca.register(key) @@ -98,11 +107,22 @@ retry: } if val == "" { + // Wait for lock release with a timeout to handle lost invalidation messages + // Use lockTTL as the wait timeout since that's the max time a lock can be held + waitCtx, cancel := context.WithTimeoutCause(ctx, rca.lockTTL, errLockWaitTimeout) + defer cancel() + select { case <-wait: goto retry - case <-ctx.Done(): - return "", ctx.Err() + case <-waitCtx.Done(): + // Check if it's our timeout or parent context cancellation + if errors.Is(context.Cause(waitCtx), errLockWaitTimeout) { + // Our timeout - retry to check Redis again + goto retry + } + // Parent context cancelled - propagate the error + return "", context.Cause(waitCtx) } } @@ -152,7 +172,8 @@ func (rca *CacheAside) trySetKeyFunc(ctx context.Context, ttl time.Duration, key if !setVal { toCtx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) defer cancel() - rca.unlock(toCtx, key, lockVal) + // Best effort unlock - errors are non-fatal as lock will expire + _ = rca.unlock(toCtx, key, lockVal) } }() if val, err = fn(ctx, key); err == nil { @@ -192,8 +213,8 @@ func (rca *CacheAside) setWithLock(ctx context.Context, ttl time.Duration, key s return valLock.val, nil } -func (rca *CacheAside) unlock(ctx context.Context, key string, lock string) { - delKeyLua.Exec(ctx, rca.client, []string{key}, []string{lock}) +func (rca *CacheAside) unlock(ctx context.Context, key string, lock string) error { + return delKeyLua.Exec(ctx, rca.client, []string{key}, []string{lock}).Error() } func (rca *CacheAside) GetMulti( @@ -203,7 +224,7 @@ func (rca *CacheAside) GetMulti( fn func(ctx context.Context, key []string) (val map[string]string, err error), ) (map[string]string, error) { - ctx, cancel := context.WithTimeout(ctx, ttl) + ctx, cancel := context.WithTimeout(ctx, rca.operationTTL) defer cancel() res := make(map[string]string, len(keys)) @@ -238,9 +259,19 @@ retry: } if len(waitLock) > 0 { - err = syncx.WaitForAll(ctx, mapsx.Values(waitLock)) + // Wait for lock releases with a timeout to handle lost invalidation messages + waitCtx, cancel := context.WithTimeoutCause(ctx, rca.lockTTL, errLockWaitTimeout) + err = syncx.WaitForAll(waitCtx, mapsx.Values(waitLock)) + cancel() + + // Check what kind of error occurred if err != nil { - return nil, err + // If it's our timeout, retry to check Redis again + if errors.Is(context.Cause(waitCtx), errLockWaitTimeout) { + goto retry + } + // Parent context cancelled - propagate the error + return nil, context.Cause(waitCtx) } goto retry } @@ -406,7 +437,7 @@ func (rca *CacheAside) setMultiWithLock(ctx context.Context, ttl time.Duration, } continue } - keyByStmt[ii] = append(out, kos.keyOrder[j]) + keyByStmt[ii] = append(keyByStmt[ii], kos.keyOrder[j]) } return nil }) @@ -438,7 +469,8 @@ func (rca *CacheAside) unlockMulti(ctx context.Context, lockVals map[string]stri wg.Add(1) go func() { defer wg.Done() - delKeyLua.ExecMulti(ctx, rca.client, stmts...) + // Best effort unlock - errors are non-fatal as locks will expire + _ = delKeyLua.ExecMulti(ctx, rca.client, stmts...) }() } wg.Wait() diff --git a/cacheaside_test.go b/cacheaside_test.go index f96ffac..cf04b56 100644 --- a/cacheaside_test.go +++ b/cacheaside_test.go @@ -621,3 +621,33 @@ func TestCacheAside_DelMulti(t *testing.T) { require.True(t, rueidis.IsRedisNil(err)) } } + +func TestCacheAside_GetParentContextCancellation(t *testing.T) { + client := makeClient(t, addr) + defer client.Client().Close() + + ctx, cancel := context.WithCancel(context.Background()) + key := "key:" + uuid.New().String() + val := "val:" + uuid.New().String() + + // Set a lock on the key so Get will wait + innerClient := client.Client() + lockVal := "redcache:" + uuid.New().String() + err := innerClient.Do(context.Background(), innerClient.B().Set().Key(key).Value(lockVal).Nx().Get().Px(time.Second*30).Build()).Error() + require.True(t, rueidis.IsRedisNil(err)) + + // Cancel the parent context after a short delay + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + cb := func(ctx context.Context, key string) (string, error) { + return val, nil + } + + // Should get parent context cancelled error, not a timeout + _, err = client.Get(ctx, time.Second*10, key, cb) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) +} From 1c97bb42a2634ee9f009e2334bcc57ff70ec64d7 Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 14:26:41 -0400 Subject: [PATCH 2/5] remove operation timeout --- cacheaside.go | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/cacheaside.go b/cacheaside.go index 9675a26..1440dc2 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -17,30 +17,24 @@ import ( ) type CacheAside struct { - client rueidis.Client - locks syncx.Map[string, chan struct{}] - lockTTL time.Duration - operationTTL time.Duration + client rueidis.Client + locks syncx.Map[string, chan struct{}] + lockTTL time.Duration } type CacheAsideOption struct { + // LockTTL is the maximum time a lock can be held, and also the timeout for waiting + // on locks when handling lost Redis invalidation messages. Defaults to 10 seconds. LockTTL time.Duration - // OperationTTL is the maximum time to wait for cache operations to complete. - // If zero, defaults to 10 seconds. This is separate from cache entry TTL. - OperationTTL time.Duration } func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOption) (*CacheAside, error) { if caOption.LockTTL == 0 { caOption.LockTTL = 10 * time.Second } - if caOption.OperationTTL == 0 { - caOption.OperationTTL = 10 * time.Second - } rca := &CacheAside{ - lockTTL: caOption.LockTTL, - operationTTL: caOption.OperationTTL, + lockTTL: caOption.LockTTL, } clientOption.OnInvalidations = rca.onInvalidate client, err := rueidis.NewClient(clientOption) @@ -84,8 +78,6 @@ func (rca *CacheAside) Get( key string, fn func(ctx context.Context, key string) (val string, err error), ) (string, error) { - ctx, cancel := context.WithTimeout(ctx, rca.operationTTL) - defer cancel() retry: wait := rca.register(key) val, err := rca.tryGet(ctx, ttl, key) @@ -224,9 +216,6 @@ func (rca *CacheAside) GetMulti( fn func(ctx context.Context, key []string) (val map[string]string, err error), ) (map[string]string, error) { - ctx, cancel := context.WithTimeout(ctx, rca.operationTTL) - defer cancel() - res := make(map[string]string, len(keys)) waitLock := make(map[string]<-chan struct{}, len(keys)) From 80088b70bb5bdb5c0563e2ae36dd7a33420a6eda Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 14:42:57 -0400 Subject: [PATCH 3/5] use context to ensure cancellation --- cacheaside.go | 82 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/cacheaside.go b/cacheaside.go index 1440dc2..fbd5a2f 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -16,9 +16,14 @@ import ( "golang.org/x/sync/errgroup" ) +type lockEntry struct { + ctx context.Context + cancel context.CancelFunc +} + type CacheAside struct { client rueidis.Client - locks syncx.Map[string, chan struct{}] + locks syncx.Map[string, *lockEntry] lockTTL time.Duration } @@ -52,9 +57,9 @@ func (rca *CacheAside) Client() rueidis.Client { func (rca *CacheAside) onInvalidate(messages []rueidis.RedisMessage) { for _, m := range messages { key, _ := m.ToString() - ch, loaded := rca.locks.LoadAndDelete(key) + entry, loaded := rca.locks.LoadAndDelete(key) if loaded { - close(ch) + entry.cancel() // Cancel context, which closes the channel } } } @@ -62,14 +67,42 @@ func (rca *CacheAside) onInvalidate(messages []rueidis.RedisMessage) { const prefix = "redcache:" var ( - delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) - setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) - errLockWaitTimeout = errors.New("lock wait timeout - retrying") + delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) + setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) ) func (rca *CacheAside) register(key string) <-chan struct{} { - ch, _ := rca.locks.LoadOrStore(key, make(chan struct{})) - return ch + // Try to load existing entry first + if entry, loaded := rca.locks.Load(key); loaded { + // Check if the context is still active (not cancelled/timed out) + select { + case <-entry.ctx.Done(): + // Context is done - clean it up and create a new one + rca.locks.Delete(key) + default: + // Context is still active - use it + return entry.ctx.Done() + } + } + entry. + // Create new entry with context that auto-cancels after lockTTL + ctx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) + + entry := &lockEntry{ + ctx: ctx, + cancel: cancel, + } + + // Store or get existing entry atomically + actual, _ := rca.locks.LoadOrStore(key, entry) + + // If another goroutine stored first, cancel our context and use theirs + if actual != entry { + cancel() + return actual.ctx.Done() + } + + return ctx.Done() } func (rca *CacheAside) Get( @@ -99,22 +132,13 @@ retry: } if val == "" { - // Wait for lock release with a timeout to handle lost invalidation messages - // Use lockTTL as the wait timeout since that's the max time a lock can be held - waitCtx, cancel := context.WithTimeoutCause(ctx, rca.lockTTL, errLockWaitTimeout) - defer cancel() - + // Wait for lock release (channel auto-closes after lockTTL or on invalidation) select { case <-wait: goto retry - case <-waitCtx.Done(): - // Check if it's our timeout or parent context cancellation - if errors.Is(context.Cause(waitCtx), errLockWaitTimeout) { - // Our timeout - retry to check Redis again - goto retry - } - // Parent context cancelled - propagate the error - return "", context.Cause(waitCtx) + case <-ctx.Done(): + // Parent context cancelled + return "", ctx.Err() } } @@ -248,19 +272,11 @@ retry: } if len(waitLock) > 0 { - // Wait for lock releases with a timeout to handle lost invalidation messages - waitCtx, cancel := context.WithTimeoutCause(ctx, rca.lockTTL, errLockWaitTimeout) - err = syncx.WaitForAll(waitCtx, mapsx.Values(waitLock)) - cancel() - - // Check what kind of error occurred + // Wait for lock releases (channels auto-close after lockTTL or on invalidation) + err = syncx.WaitForAll(ctx, mapsx.Values(waitLock)) if err != nil { - // If it's our timeout, retry to check Redis again - if errors.Is(context.Cause(waitCtx), errLockWaitTimeout) { - goto retry - } - // Parent context cancelled - propagate the error - return nil, context.Cause(waitCtx) + // Parent context cancelled + return nil, ctx.Err() } goto retry } From cfde17d5eeacc2802f65579c6a916d6ce618871a Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 15:00:37 -0400 Subject: [PATCH 4/5] Update cacheaside.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cacheaside.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cacheaside.go b/cacheaside.go index dad0090..56ea442 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -282,7 +282,7 @@ retry: // Wait for lock releases (channels auto-close after lockTTL or on invalidation) err = syncx.WaitForAll(ctx, maps.Values(waitLock), len(waitLock)) if err != nil { - // Parent context cancelled + // Parent context cancelled or deadline exceeded return nil, ctx.Err() } goto retry From 3741e78f7f348c63323baa87861a7934c70e1904 Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 16:26:42 -0400 Subject: [PATCH 5/5] fixed raw deletion with compare and delete --- cacheaside.go | 63 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/cacheaside.go b/cacheaside.go index dad0090..afd77d6 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -4,6 +4,7 @@ import ( "context" "errors" "iter" + "log/slog" "maps" "strconv" "strings" @@ -23,10 +24,15 @@ type lockEntry struct { cancel context.CancelFunc } +type Logger interface { + Error(msg string, args ...any) +} + type CacheAside struct { client rueidis.Client locks syncx.Map[string, *lockEntry] lockTTL time.Duration + logger Logger } type CacheAsideOption struct { @@ -34,6 +40,8 @@ type CacheAsideOption struct { // on locks when handling lost Redis invalidation messages. Defaults to 10 seconds. LockTTL time.Duration ClientBuilder func(option rueidis.ClientOption) (rueidis.Client, error) + // Logger for logging non-fatal errors. Defaults to slog.Default(). + Logger Logger } func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOption) (*CacheAside, error) { @@ -41,9 +49,13 @@ func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOpti if caOption.LockTTL == 0 { caOption.LockTTL = 10 * time.Second } + if caOption.Logger == nil { + caOption.Logger = slog.Default() + } rca := &CacheAside{ lockTTL: caOption.LockTTL, + logger: caOption.Logger, } clientOption.OnInvalidations = rca.onInvalidate if caOption.ClientBuilder != nil { @@ -79,37 +91,37 @@ var ( ) func (rca *CacheAside) register(key string) <-chan struct{} { - // Try to load existing entry first - if entry, loaded := rca.locks.Load(key); loaded { - // Check if the context is still active (not cancelled/timed out) - select { - case <-entry.ctx.Done(): - // Context is done - clean it up and create a new one - rca.locks.Delete(key) - default: - // Context is still active - use it - return entry.ctx.Done() - } - } - +retry: // Create new entry with context that auto-cancels after lockTTL ctx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) - entry := &lockEntry{ + newEntry := &lockEntry{ ctx: ctx, cancel: cancel, } // Store or get existing entry atomically - actual, _ := rca.locks.LoadOrStore(key, entry) + actual, loaded := rca.locks.LoadOrStore(key, newEntry) - // If another goroutine stored first, cancel our context and use theirs - if actual != entry { - cancel() - return actual.ctx.Done() + // If we successfully stored, return our context + if !loaded { + return ctx.Done() } - return ctx.Done() + // Another goroutine stored first, cancel our context + cancel() + + // Check if their context is still active (not cancelled/timed out) + select { + case <-actual.ctx.Done(): + // Context is done - try to atomically delete it and retry + // If CompareAndDelete fails, another goroutine already replaced it + rca.locks.CompareAndDelete(key, actual) + goto retry + default: + // Context is still active - use it + return actual.ctx.Done() + } } func (rca *CacheAside) Get( @@ -196,7 +208,9 @@ func (rca *CacheAside) trySetKeyFunc(ctx context.Context, ttl time.Duration, key toCtx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) defer cancel() // Best effort unlock - errors are non-fatal as lock will expire - _ = rca.unlock(toCtx, key, lockVal) + if err := rca.unlock(toCtx, key, lockVal); err != nil { + rca.logger.Error("failed to unlock key", "key", key, "error", err) + } } }() if val, err = fn(ctx, key); err == nil { @@ -482,7 +496,12 @@ func (rca *CacheAside) unlockMulti(ctx context.Context, lockVals map[string]stri go func() { defer wg.Done() // Best effort unlock - errors are non-fatal as locks will expire - _ = delKeyLua.ExecMulti(ctx, rca.client, stmts...) + resps := delKeyLua.ExecMulti(ctx, rca.client, stmts...) + for _, resp := range resps { + if err := resp.Error(); err != nil { + rca.logger.Error("failed to unlock key in batch", "error", err) + } + } }() } wg.Wait()