From 3c8372836d2c55b459ff30a37f4c1d78d389abd9 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Mon, 13 Jan 2020 17:25:09 -0800 Subject: [PATCH 01/12] Initial commit for TTL feature. --- cache.go | 45 +++++++++++++++++++++++++++++++----------- store.go | 58 +++++++++++++++++++++++++++++++++++++----------------- ttl.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 29 deletions(-) create mode 100644 ttl.go diff --git a/cache.go b/cache.go index 7f748930..3c100267 100644 --- a/cache.go +++ b/cache.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "sync/atomic" + "time" "github.com/dgraph-io/ristretto/z" ) @@ -115,11 +116,12 @@ const ( // item is passed to setBuf so items can eventually be added to the cache type item struct { - flag itemFlag - key uint64 - conflict uint64 - value interface{} - cost int64 + flag itemFlag + key uint64 + conflict uint64 + value interface{} + cost int64 + expiration time.Time } // NewCache returns a new Cache instance and any configuration errors, if any. @@ -153,6 +155,7 @@ func NewCache(config *Config) (*Cache, error) { // goroutines we have running cache.processItems(), so 1 should // usually be sufficient go cache.processItems() + go cache.cleanupItems() return cache, nil } @@ -184,16 +187,32 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) { // the cost parameter to 0 and Coster will be ran when needed in order to find // the items true cost. func (c *Cache) Set(key, value interface{}, cost int64) bool { + return c.SetWithTTL(key, value, cost, 0*time.Second) +} + +// SetWithTTL works like Set but adds a key-value pair to the cache that will expire +// after the specified TTL (time to live) has passed. A zero or negative value will +// cause the value to never expire, which is identical to calling Set. +func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool { if c == nil || key == nil { return false } + + now := time.Now() + expiration := now.Add(ttl) + if !expiration.After(now) { + // The TTL is either zero or negative. Treat this item as one without a TTL. + expiration = time.Time{} + } + keyHash, conflictHash := c.keyToHash(key) i := &item{ - flag: itemNew, - key: keyHash, - conflict: conflictHash, - value: value, - cost: cost, + flag: itemNew, + key: keyHash, + conflict: conflictHash, + value: value, + cost: cost, + expiration: expiration, } // attempt to immediately update hashmap value and set flag to update so the // cost is eventually updated @@ -264,7 +283,7 @@ func (c *Cache) processItems() { case itemNew: victims, added := c.policy.Add(i.key, i.cost) if added { - c.store.Set(i.key, i.conflict, i.value) + c.store.Set(i) c.Metrics.add(keyAdd, i.key, 1) } for _, victim := range victims { @@ -287,6 +306,10 @@ func (c *Cache) processItems() { } } +func (c *Cache) cleanupItems() { + +} + // collectMetrics just creates a new *Metrics instance and adds the pointers // to the cache and policy instances. func (c *Cache) collectMetrics() { diff --git a/store.go b/store.go index 3f3fbce4..4a0d1ee9 100644 --- a/store.go +++ b/store.go @@ -18,12 +18,14 @@ package ristretto import ( "sync" + "time" ) type storeItem struct { - key uint64 - conflict uint64 - value interface{} + key uint64 + conflict uint64 + value interface{} + expiration time.Time } // store is the interface fulfilled by all hash map implementations in this @@ -36,8 +38,9 @@ type store interface { // Get returns the value associated with the key parameter. Get(uint64, uint64) (interface{}, bool) // Set adds the key-value pair to the Map or updates the value if it's - // already present. - Set(uint64, uint64, interface{}) + // already present. The key-value pair is passed as a pointer to an + // item object. + Set(*item) // Del deletes the key-value pair from the Map. Del(uint64, uint64) (uint64, interface{}) // Update attempts to update the key with a new value and returns true if @@ -72,8 +75,13 @@ func (sm *shardedMap) Get(key, conflict uint64) (interface{}, bool) { return sm.shards[key%numShards].Get(key, conflict) } -func (sm *shardedMap) Set(key, conflict uint64, value interface{}) { - sm.shards[key%numShards].Set(key, conflict, value) +func (sm *shardedMap) Set(i *item) { + if i == nil { + // If item is nil make this Set a no-op. + return + } + + sm.shards[i.key%numShards].Set(i) } func (sm *shardedMap) Del(key, conflict uint64) (uint64, interface{}) { @@ -111,29 +119,43 @@ func (m *lockedMap) Get(key, conflict uint64) (interface{}, bool) { if conflict != 0 && (conflict != item.conflict) { return nil, false } + + // Handle expired items. + if !item.expiration.IsZero() { + if item.expiration.After(time.Now()) { + return nil, false + } + } return item.value, true } -func (m *lockedMap) Set(key, conflict uint64, value interface{}) { +func (m *lockedMap) Set(i *item) { + if i == nil { + // If the item is nil make this Set a no-op. + return + } + m.Lock() - item, ok := m.data[key] + item, ok := m.data[i.key] if !ok { - m.data[key] = storeItem{ - key: key, - conflict: conflict, - value: value, + m.data[i.key] = storeItem{ + key: i.key, + conflict: i.conflict, + value: i.value, + expiration: i.expiration, } m.Unlock() return } - if conflict != 0 && (conflict != item.conflict) { + if i.conflict != 0 && (i.conflict != item.conflict) { m.Unlock() return } - m.data[key] = storeItem{ - key: key, - conflict: conflict, - value: value, + m.data[i.key] = storeItem{ + key: i.key, + conflict: i.conflict, + value: i.value, + expiration: i.expiration, } m.Unlock() } diff --git a/ttl.go b/ttl.go new file mode 100644 index 00000000..84b29000 --- /dev/null +++ b/ttl.go @@ -0,0 +1,60 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ristretto + +import ( + "sync" + "time" +) + +const ( + // TODO: find the optimal value or make it configurable. + bucketSize = 5 +) + +func timeToBucket(t time.Time) int { + return t.Second() / bucketSize +} + +type bucketMap map[uint64]uint64 + +type expirationMap struct { + sync.RWMutex + m map[int]bucketMap +} + +func newExpirationMap() *expirationMap { + return &expirationMap{ + m: make(map[int]bucketMap), + } +} + +func (m *expirationMap) Add(key, conflict uint64, expiration time.Time) { + if expiration.IsZero() { + return + } + + m.Lock() + defer m.Unlock() + + bucketNum := timeToBucket(expiration) + _, ok := m.m[bucketNum] + if !ok { + m.m[bucketNum] = make(bucketMap) + } + m.m[bucketNum][key] = conflict +} From 3f0ab0c2143b6f3372af2dc4cd2dfc58eaed210d Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 14 Jan 2020 17:11:30 -0800 Subject: [PATCH 02/12] more changes. --- cache.go | 5 ----- ttl.go | 13 +++++++++++++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/cache.go b/cache.go index 3c100267..292a1e7d 100644 --- a/cache.go +++ b/cache.go @@ -155,7 +155,6 @@ func NewCache(config *Config) (*Cache, error) { // goroutines we have running cache.processItems(), so 1 should // usually be sufficient go cache.processItems() - go cache.cleanupItems() return cache, nil } @@ -306,10 +305,6 @@ func (c *Cache) processItems() { } } -func (c *Cache) cleanupItems() { - -} - // collectMetrics just creates a new *Metrics instance and adds the pointers // to the cache and policy instances. func (c *Cache) collectMetrics() { diff --git a/ttl.go b/ttl.go index 84b29000..b1cf66d0 100644 --- a/ttl.go +++ b/ttl.go @@ -58,3 +58,16 @@ func (m *expirationMap) Add(key, conflict uint64, expiration time.Time) { } m.m[bucketNum][key] = conflict } + +func (m *expirationMap) Remove(key uint64, expiration time.Time) { + m.Lock() + defer m.Unlock() + + bucketNum := timeToBucket(expiration) + _, ok := m.m[bucketNum] + if !ok { + return + } + + delete(m.m[bucketNum], key) +} From d97ae8fd5b106bf205fdf3a2bea95737c1039568 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Wed, 15 Jan 2020 15:32:30 -0800 Subject: [PATCH 03/12] Initial implementation. --- cache.go | 35 ++++++++++++++++++++++++++--------- store.go | 12 ++++++++++++ ttl.go | 45 ++++++++++++++++++++++++++++++--------------- 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/cache.go b/cache.go index 292a1e7d..89919934 100644 --- a/cache.go +++ b/cache.go @@ -34,6 +34,8 @@ const ( setBufSize = 32 * 1024 ) +type onEvictFunc func(uint64, uint64, interface{}, int64) + // Cache is a thread-safe implementation of a hashmap with a TinyLFU admission // policy and a Sampled LFU eviction policy. You can use the same Cache instance // from as many goroutines as you want. @@ -49,7 +51,7 @@ type Cache struct { // contention setBuf chan *item // onEvict is called for item evictions - onEvict func(uint64, uint64, interface{}, int64) + onEvict onEvictFunc // KeyToHash function is used to customize the key hashing algorithm. // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. @@ -58,6 +60,11 @@ type Cache struct { stop chan struct{} // cost calculates cost from a value cost func(value interface{}) int64 + // ttlMap is used to store information about which entries to delete once their + // expiration time has passed. + ttlMap *expirationMap + // cleanupTicker is used to periodically check for entries whose TTL has passed. + cleanupTicker *time.Ticker // Metrics contains a running log of important statistics like hits, misses, // and dropped items Metrics *Metrics @@ -136,14 +143,16 @@ func NewCache(config *Config) (*Cache, error) { } policy := newPolicy(config.NumCounters, config.MaxCost) cache := &Cache{ - store: newStore(), - policy: policy, - getBuf: newRingBuffer(policy, config.BufferItems), - setBuf: make(chan *item, setBufSize), - onEvict: config.OnEvict, - keyToHash: config.KeyToHash, - stop: make(chan struct{}), - cost: config.Cost, + store: newStore(), + policy: policy, + getBuf: newRingBuffer(policy, config.BufferItems), + setBuf: make(chan *item, setBufSize), + onEvict: config.OnEvict, + keyToHash: config.KeyToHash, + stop: make(chan struct{}), + cost: config.Cost, + ttlMap: newExpirationMap(), + cleanupTicker: time.NewTicker(bucketSizeSecs * time.Second), } if cache.keyToHash == nil { cache.keyToHash = z.KeyToHash @@ -282,7 +291,13 @@ func (c *Cache) processItems() { case itemNew: victims, added := c.policy.Add(i.key, i.cost) if added { + currentExpiration := c.store.Expiration(i.key) + if !currentExpiration.IsZero() { + c.ttlMap.Delete(i.key, currentExpiration) + } + c.store.Set(i) + c.ttlMap.Add(i.key, i.conflict, i.expiration) c.Metrics.add(keyAdd, i.key, 1) } for _, victim := range victims { @@ -299,6 +314,8 @@ func (c *Cache) processItems() { c.policy.Del(i.key) // Deals with metrics updates. c.store.Del(i.key, i.conflict) } + case <-c.cleanupTicker.C: + c.ttlMap.CleanUp(c.store, c.policy, c.onEvict) case <-c.stop: return } diff --git a/store.go b/store.go index 4a0d1ee9..78821e12 100644 --- a/store.go +++ b/store.go @@ -37,6 +37,8 @@ type storeItem struct { type store interface { // Get returns the value associated with the key parameter. Get(uint64, uint64) (interface{}, bool) + // Expiration returns the expiration time for this key. + Expiration(uint64) time.Time // Set adds the key-value pair to the Map or updates the value if it's // already present. The key-value pair is passed as a pointer to an // item object. @@ -75,6 +77,10 @@ func (sm *shardedMap) Get(key, conflict uint64) (interface{}, bool) { return sm.shards[key%numShards].Get(key, conflict) } +func (sm *shardedMap) Expiration(key uint64) time.Time { + return sm.shards[key%numShards].Expiration(key) +} + func (sm *shardedMap) Set(i *item) { if i == nil { // If item is nil make this Set a no-op. @@ -129,6 +135,12 @@ func (m *lockedMap) Get(key, conflict uint64) (interface{}, bool) { return item.value, true } +func (m *lockedMap) Expiration(key uint64) time.Time { + m.RLock() + defer m.RUnlock() + return m.data[key].expiration +} + func (m *lockedMap) Set(i *item) { if i == nil { // If the item is nil make this Set a no-op. diff --git a/ttl.go b/ttl.go index b1cf66d0..d0f1c668 100644 --- a/ttl.go +++ b/ttl.go @@ -23,23 +23,24 @@ import ( const ( // TODO: find the optimal value or make it configurable. - bucketSize = 5 + bucketSizeSecs = 5 ) func timeToBucket(t time.Time) int { - return t.Second() / bucketSize + return t.Second() / bucketSizeSecs } -type bucketMap map[uint64]uint64 +// Map of key to conflict. +type bucket map[uint64]uint64 type expirationMap struct { sync.RWMutex - m map[int]bucketMap + buckets map[int]bucket } func newExpirationMap() *expirationMap { return &expirationMap{ - m: make(map[int]bucketMap), + buckets: make(map[int]bucket), } } @@ -48,26 +49,40 @@ func (m *expirationMap) Add(key, conflict uint64, expiration time.Time) { return } + bucketNum := timeToBucket(expiration) m.Lock() defer m.Unlock() - - bucketNum := timeToBucket(expiration) - _, ok := m.m[bucketNum] + _, ok := m.buckets[bucketNum] if !ok { - m.m[bucketNum] = make(bucketMap) + m.buckets[bucketNum] = make(bucket) } - m.m[bucketNum][key] = conflict + m.buckets[bucketNum][key] = conflict } -func (m *expirationMap) Remove(key uint64, expiration time.Time) { +func (m *expirationMap) Delete(key uint64, expiration time.Time) { + bucketNum := timeToBucket(expiration) m.Lock() defer m.Unlock() - - bucketNum := timeToBucket(expiration) - _, ok := m.m[bucketNum] + _, ok := m.buckets[bucketNum] if !ok { return } + delete(m.buckets[bucketNum], key) +} + +func (m *expirationMap) CleanUp(store store, policy policy, onEvict onEvictFunc) { + bucketNum := timeToBucket(time.Now()) - delete(m.m[bucketNum], key) + m.Lock() + keys := m.buckets[bucketNum] + delete(m.buckets, bucketNum) + m.Unlock() + + for _, key := range keys { + conflict, value := store.Del(key, 0) + cost := policy.Cost(key) + if onEvict != nil { + onEvict(key, conflict, value, cost) + } + } } From 3d41bf52cc7ddb9f3201df4124349b1600e41652 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Wed, 15 Jan 2020 18:10:28 -0800 Subject: [PATCH 04/12] Fix tests. --- cache_test.go | 7 +++++- store_test.go | 66 +++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/cache_test.go b/cache_test.go index 914090f5..599ff7b0 100644 --- a/cache_test.go +++ b/cache_test.go @@ -243,7 +243,12 @@ func TestCacheGet(t *testing.T) { panic(err) } key, conflict := z.KeyToHash(1) - c.store.Set(key, conflict, 1) + i := item{ + key: key, + conflict: conflict, + value: 1, + } + c.store.Set(&i) if val, ok := c.Get(1); val == nil || !ok { t.Fatal("get should be successful") } diff --git a/store_test.go b/store_test.go index 03a21344..9a338c8c 100644 --- a/store_test.go +++ b/store_test.go @@ -9,16 +9,27 @@ import ( func TestStoreSetGet(t *testing.T) { s := newStore() key, conflict := z.KeyToHash(1) - s.Set(key, conflict, 2) + i := item{ + key: key, + conflict: conflict, + value: 2, + } + s.Set(&i) if val, ok := s.Get(key, conflict); (val == nil || !ok) || val.(int) != 2 { t.Fatal("set/get error") } - s.Set(key, conflict, 3) + i.value = 3 + s.Set(&i) if val, ok := s.Get(key, conflict); (val == nil || !ok) || val.(int) != 3 { t.Fatal("set/get overwrite error") } key, conflict = z.KeyToHash(2) - s.Set(key, conflict, 2) + i = item{ + key: key, + conflict: conflict, + value: 2, + } + s.Set(&i) if val, ok := s.Get(key, conflict); !ok || val.(int) != 2 { t.Fatal("set/get nil key error") } @@ -27,7 +38,12 @@ func TestStoreSetGet(t *testing.T) { func TestStoreDel(t *testing.T) { s := newStore() key, conflict := z.KeyToHash(1) - s.Set(key, conflict, 1) + i := item{ + key: key, + conflict: conflict, + value: 1, + } + s.Set(&i) s.Del(key, conflict) if val, ok := s.Get(key, conflict); val != nil || ok { t.Fatal("del error") @@ -39,7 +55,12 @@ func TestStoreClear(t *testing.T) { s := newStore() for i := uint64(0); i < 1000; i++ { key, conflict := z.KeyToHash(i) - s.Set(key, conflict, i) + it := item{ + key: key, + conflict: conflict, + value: i, + } + s.Set(&it) } s.Clear() for i := uint64(0); i < 1000; i++ { @@ -53,7 +74,12 @@ func TestStoreClear(t *testing.T) { func TestStoreUpdate(t *testing.T) { s := newStore() key, conflict := z.KeyToHash(1) - s.Set(key, conflict, 1) + i := item{ + key: key, + conflict: conflict, + value: 1, + } + s.Set(&i) if updated := s.Update(key, conflict, 2); !updated { t.Fatal("value should have been updated") } @@ -90,7 +116,12 @@ func TestStoreCollision(t *testing.T) { if val, ok := s.Get(1, 1); val != nil || ok { t.Fatal("collision should return nil") } - s.Set(1, 1, 2) + i := item{ + key: 1, + conflict: 1, + value: 2, + } + s.Set(&i) if val, ok := s.Get(1, 0); !ok || val == nil || val.(int) == 2 { t.Fatal("collision should prevent Set update") } @@ -109,7 +140,12 @@ func TestStoreCollision(t *testing.T) { func BenchmarkStoreGet(b *testing.B) { s := newStore() key, conflict := z.KeyToHash(1) - s.Set(key, conflict, 1) + i := item{ + key: key, + conflict: conflict, + value: 1, + } + s.Set(&i) b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -124,7 +160,12 @@ func BenchmarkStoreSet(b *testing.B) { b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - s.Set(key, conflict, 1) + i := item{ + key: key, + conflict: conflict, + value: 1, + } + s.Set(&i) } }) } @@ -132,7 +173,12 @@ func BenchmarkStoreSet(b *testing.B) { func BenchmarkStoreUpdate(b *testing.B) { s := newStore() key, conflict := z.KeyToHash(1) - s.Set(key, conflict, 1) + i := item{ + key: key, + conflict: conflict, + value: 1, + } + s.Set(&i) b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { for pb.Next() { From 34c4bf80a5a1080c65ad1b0fb7584d3958d71834 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 16 Jan 2020 11:47:20 -0800 Subject: [PATCH 05/12] add test. --- cache_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/cache_test.go b/cache_test.go index 1c3bd5ae..9986c5eb 100644 --- a/cache_test.go +++ b/cache_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/dgraph-io/ristretto/z" + "github.com/stretchr/testify/require" ) var wait time.Duration = time.Millisecond * 10 @@ -345,6 +346,56 @@ func TestCacheSet(t *testing.T) { } } +func TestCacheSetWithTTL(t *testing.T) { + c, err := NewCache(&Config{ + NumCounters: 100, + MaxCost: 10, + BufferItems: 64, + Metrics: true, + }) + if err != nil { + panic(err) + } + if c.Set(1, 1, 1) { + time.Sleep(wait) + if val, ok := c.Get(1); val == nil || val.(int) != 1 || !ok { + t.Fatal("set/get returned wrong value") + } + } else { + if val, ok := c.Get(1); val != nil || ok { + t.Fatal("set was dropped but value still added") + } + } + c.Set(1, 2, 2) + val, ok := c.store.Get(z.KeyToHash(1)) + if val == nil || val.(int) != 2 || !ok { + t.Fatal("set/update was unsuccessful") + } + c.stop <- struct{}{} + for i := 0; i < setBufSize; i++ { + key, conflict := z.KeyToHash(1) + c.setBuf <- &item{ + flag: itemUpdate, + key: key, + conflict: conflict, + value: 1, + cost: 1, + } + } + if c.Set(2, 2, 1) { + t.Fatal("set should be dropped with full setBuf") + } + if c.Metrics.SetsDropped() != 1 { + t.Fatal("set should track dropSets") + } + close(c.setBuf) + close(c.stop) + c = nil + if c.Set(1, 1, 1) { + t.Fatal("set shouldn't be successful with nil cache") + } +} + func TestCacheDel(t *testing.T) { c, err := NewCache(&Config{ NumCounters: 100, From ce32e56f242d457fdd6c6af0cc36d01d479e5983 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 16 Jan 2020 15:36:16 -0800 Subject: [PATCH 06/12] Add working test. --- cache.go | 9 +++-- cache_test.go | 91 ++++++++++++++++++++++++++++----------------------- store.go | 35 +++++++++++++------- store_test.go | 57 +++++++++++++++++++------------- ttl.go | 9 +++-- 5 files changed, 120 insertions(+), 81 deletions(-) diff --git a/cache.go b/cache.go index fce03ee4..248ffc4d 100644 --- a/cache.go +++ b/cache.go @@ -222,9 +222,9 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration cost: cost, expiration: expiration, } - // attempt to immediately update hashmap value and set flag to update so the - // cost is eventually updated - if c.store.Update(keyHash, conflictHash, i.value) { + // Attempt to immediately update hashmap value and set flag to update so the + // cost is eventually updated. The expiration must also be immediately updated + if c.store.Update(i, c.ttlMap) { i.flag = itemUpdate } // attempt to send item to policy @@ -313,6 +313,9 @@ func (c *Cache) processItems() { c.Metrics.add(keyAdd, i.key, 1) } for _, victim := range victims { + if victimExp := c.store.Expiration(victim.key); !victimExp.IsZero() { + c.ttlMap.Delete(victim.key, victimExp) + } victim.conflict, victim.value = c.store.Del(victim.key, 0) if c.onEvict != nil { c.onEvict(victim.key, victim.conflict, victim.value, victim.cost) diff --git a/cache_test.go b/cache_test.go index 9986c5eb..c89b7f95 100644 --- a/cache_test.go +++ b/cache_test.go @@ -275,9 +275,9 @@ func TestCacheGet(t *testing.T) { } key, conflict := z.KeyToHash(1) i := item{ - key: key, + key: key, conflict: conflict, - value: 1, + value: 1, } c.store.Set(&i) if val, ok := c.Get(1); val == nil || !ok { @@ -347,53 +347,62 @@ func TestCacheSet(t *testing.T) { } func TestCacheSetWithTTL(t *testing.T) { + m := &sync.Mutex{} + evicted := make(map[uint64]struct{}) c, err := NewCache(&Config{ NumCounters: 100, MaxCost: 10, BufferItems: 64, Metrics: true, + OnEvict: func(key, conflict uint64, value interface{}, cost int64) { + m.Lock() + defer m.Unlock() + evicted[key] = struct{}{} + }, }) - if err != nil { - panic(err) - } - if c.Set(1, 1, 1) { - time.Sleep(wait) - if val, ok := c.Get(1); val == nil || val.(int) != 1 || !ok { - t.Fatal("set/get returned wrong value") - } - } else { - if val, ok := c.Get(1); val != nil || ok { - t.Fatal("set was dropped but value still added") - } - } - c.Set(1, 2, 2) - val, ok := c.store.Get(z.KeyToHash(1)) - if val == nil || val.(int) != 2 || !ok { - t.Fatal("set/update was unsuccessful") - } - c.stop <- struct{}{} - for i := 0; i < setBufSize; i++ { - key, conflict := z.KeyToHash(1) - c.setBuf <- &item{ - flag: itemUpdate, - key: key, - conflict: conflict, - value: 1, - cost: 1, + require.NoError(t, err) + + // retrySet calls SetWithTTL until the item is accepted by the cache. + retrySet := func(key, value int, cost int64, ttl time.Duration) { + for { + if set := c.SetWithTTL(key, value, cost, ttl); !set { + time.Sleep(wait) + continue + } + + time.Sleep(wait) + val, ok := c.Get(key) + require.True(t, ok) + require.NotNil(t, val) + require.Equal(t, value, val.(int)) + return } } - if c.Set(2, 2, 1) { - t.Fatal("set should be dropped with full setBuf") - } - if c.Metrics.SetsDropped() != 1 { - t.Fatal("set should track dropSets") - } - close(c.setBuf) - close(c.stop) - c = nil - if c.Set(1, 1, 1) { - t.Fatal("set shouldn't be successful with nil cache") - } + + retrySet(1, 1, 1, 3*time.Second) + + // Sleep to make sure the item has expired after execution resumes. + time.Sleep(5 * time.Second) + val, ok := c.Get(1) + require.False(t, ok) + require.Nil(t, val) + + // Sleep to ensure that the bucket where the item was stored has been cleared + // from the expiraton map. + time.Sleep(5 * time.Second) + m.Lock() + require.Equal(t, 1, len(evicted)) + _, ok = evicted[1] + require.True(t, ok) + m.Unlock() + + // Verify that expiration times are overwritten. + retrySet(2, 1, 1, time.Second) + retrySet(2, 1, 1, 100*time.Second) + time.Sleep(5 * time.Second) + val, ok = c.Get(2) + require.True(t, ok) + require.Equal(t, 1, val.(int)) } func TestCacheDel(t *testing.T) { diff --git a/store.go b/store.go index 78821e12..8d18849f 100644 --- a/store.go +++ b/store.go @@ -47,7 +47,7 @@ type store interface { Del(uint64, uint64) (uint64, interface{}) // Update attempts to update the key with a new value and returns true if // successful. - Update(uint64, uint64, interface{}) bool + Update(*item, *expirationMap) bool // Clear clears all contents of the store. Clear() } @@ -94,8 +94,8 @@ func (sm *shardedMap) Del(key, conflict uint64) (uint64, interface{}) { return sm.shards[key%numShards].Del(key, conflict) } -func (sm *shardedMap) Update(key, conflict uint64, value interface{}) bool { - return sm.shards[key%numShards].Update(key, conflict, value) +func (sm *shardedMap) Update(i *item, m *expirationMap) bool { + return sm.shards[i.key%numShards].Update(i, m) } func (sm *shardedMap) Clear() { @@ -128,7 +128,7 @@ func (m *lockedMap) Get(key, conflict uint64) (interface{}, bool) { // Handle expired items. if !item.expiration.IsZero() { - if item.expiration.After(time.Now()) { + if time.Now().After(item.expiration) { return nil, false } } @@ -188,22 +188,35 @@ func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) { return item.conflict, item.value } -func (m *lockedMap) Update(key, conflict uint64, value interface{}) bool { +func (m *lockedMap) Update(newItem *item, eMap *expirationMap) bool { m.Lock() - item, ok := m.data[key] + item, ok := m.data[newItem.key] if !ok { m.Unlock() return false } - if conflict != 0 && (conflict != item.conflict) { + if newItem.conflict != 0 && (newItem.conflict != item.conflict) { m.Unlock() return false } - m.data[key] = storeItem{ - key: key, - conflict: conflict, - value: value, + + // Delete existing item from the expirationMap. + if !item.expiration.IsZero() && eMap != nil { + eMap.Delete(item.key, item.expiration) + } + + m.data[newItem.key] = storeItem{ + key: newItem.key, + conflict: newItem.conflict, + value: newItem.value, + expiration: newItem.expiration, + } + + // Add new expiration to the expiration time. + if !newItem.expiration.IsZero() && eMap != nil { + eMap.Add(newItem.key, newItem.conflict, newItem.expiration) } + m.Unlock() return true } diff --git a/store_test.go b/store_test.go index 9a338c8c..e76a2ead 100644 --- a/store_test.go +++ b/store_test.go @@ -10,9 +10,9 @@ func TestStoreSetGet(t *testing.T) { s := newStore() key, conflict := z.KeyToHash(1) i := item{ - key: key, + key: key, conflict: conflict, - value: 2, + value: 2, } s.Set(&i) if val, ok := s.Get(key, conflict); (val == nil || !ok) || val.(int) != 2 { @@ -25,9 +25,9 @@ func TestStoreSetGet(t *testing.T) { } key, conflict = z.KeyToHash(2) i = item{ - key: key, + key: key, conflict: conflict, - value: 2, + value: 2, } s.Set(&i) if val, ok := s.Get(key, conflict); !ok || val.(int) != 2 { @@ -39,9 +39,9 @@ func TestStoreDel(t *testing.T) { s := newStore() key, conflict := z.KeyToHash(1) i := item{ - key: key, + key: key, conflict: conflict, - value: 1, + value: 1, } s.Set(&i) s.Del(key, conflict) @@ -56,9 +56,9 @@ func TestStoreClear(t *testing.T) { for i := uint64(0); i < 1000; i++ { key, conflict := z.KeyToHash(i) it := item{ - key: key, + key: key, conflict: conflict, - value: i, + value: i, } s.Set(&it) } @@ -75,12 +75,13 @@ func TestStoreUpdate(t *testing.T) { s := newStore() key, conflict := z.KeyToHash(1) i := item{ - key: key, + key: key, conflict: conflict, - value: 1, + value: 1, } s.Set(&i) - if updated := s.Update(key, conflict, 2); !updated { + i.value = 2 + if updated := s.Update(&i, nil); !updated { t.Fatal("value should have been updated") } if val, ok := s.Get(key, conflict); val == nil || !ok { @@ -89,14 +90,20 @@ func TestStoreUpdate(t *testing.T) { if val, ok := s.Get(key, conflict); val.(int) != 2 || !ok { t.Fatal("value wasn't updated") } - if !s.Update(key, conflict, 3) { + i.value = 3 + if !s.Update(&i, nil) { t.Fatal("value should have been updated") } if val, ok := s.Get(key, conflict); val.(int) != 3 || !ok { t.Fatal("value wasn't updated") } key, conflict = z.KeyToHash(2) - if updated := s.Update(key, conflict, 2); updated { + i = item{ + key: key, + conflict: conflict, + value: 2, + } + if updated := s.Update(&i, nil); updated { t.Fatal("value should not have been updated") } if val, ok := s.Get(key, conflict); val != nil || ok { @@ -117,15 +124,15 @@ func TestStoreCollision(t *testing.T) { t.Fatal("collision should return nil") } i := item{ - key: 1, + key: 1, conflict: 1, - value: 2, + value: 2, } s.Set(&i) if val, ok := s.Get(1, 0); !ok || val == nil || val.(int) == 2 { t.Fatal("collision should prevent Set update") } - if s.Update(1, 1, 2) { + if s.Update(&i, nil) { t.Fatal("collision should prevent Update") } if val, ok := s.Get(1, 0); !ok || val == nil || val.(int) == 2 { @@ -141,9 +148,9 @@ func BenchmarkStoreGet(b *testing.B) { s := newStore() key, conflict := z.KeyToHash(1) i := item{ - key: key, + key: key, conflict: conflict, - value: 1, + value: 1, } s.Set(&i) b.SetBytes(1) @@ -161,9 +168,9 @@ func BenchmarkStoreSet(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { i := item{ - key: key, + key: key, conflict: conflict, - value: 1, + value: 1, } s.Set(&i) } @@ -174,15 +181,19 @@ func BenchmarkStoreUpdate(b *testing.B) { s := newStore() key, conflict := z.KeyToHash(1) i := item{ - key: key, + key: key, conflict: conflict, - value: 1, + value: 1, } s.Set(&i) b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - s.Update(key, conflict, 2) + s.Update(&item{ + key: key, + conflict: conflict, + value: 2, + }, nil) } }) } diff --git a/ttl.go b/ttl.go index d0f1c668..201eeec0 100644 --- a/ttl.go +++ b/ttl.go @@ -71,15 +71,18 @@ func (m *expirationMap) Delete(key uint64, expiration time.Time) { } func (m *expirationMap) CleanUp(store store, policy policy, onEvict onEvictFunc) { - bucketNum := timeToBucket(time.Now()) + // Get the bucket number for the current time and substract one. There might be + // items in the current bucket that have not expired yet but all the items in + // the previous bucket should have expired. + bucketNum := timeToBucket(time.Now()) - 1 m.Lock() keys := m.buckets[bucketNum] delete(m.buckets, bucketNum) m.Unlock() - for _, key := range keys { - conflict, value := store.Del(key, 0) + for key, conflict := range keys { + _, value := store.Del(key, 0) cost := policy.Cost(key) if onEvict != nil { onEvict(key, conflict, value, cost) From a0ce592b6b34ff1669b0569a24abb5891703b813 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 16 Jan 2020 17:00:48 -0800 Subject: [PATCH 07/12] rename vars. --- store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store.go b/store.go index 8d18849f..1acbfd51 100644 --- a/store.go +++ b/store.go @@ -94,8 +94,8 @@ func (sm *shardedMap) Del(key, conflict uint64) (uint64, interface{}) { return sm.shards[key%numShards].Del(key, conflict) } -func (sm *shardedMap) Update(i *item, m *expirationMap) bool { - return sm.shards[i.key%numShards].Update(i, m) +func (sm *shardedMap) Update(newItem *item, eMap *expirationMap) bool { + return sm.shards[newItem.key%numShards].Update(newItem, eMap) } func (sm *shardedMap) Clear() { From 785ebbe27b92ac8585a955e64f7da5f3202a44d1 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 17 Jan 2020 11:42:29 -0800 Subject: [PATCH 08/12] Andd and fix comments. --- cache.go | 1 + ttl.go | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/cache.go b/cache.go index 248ffc4d..07ec504d 100644 --- a/cache.go +++ b/cache.go @@ -224,6 +224,7 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration } // Attempt to immediately update hashmap value and set flag to update so the // cost is eventually updated. The expiration must also be immediately updated + // to prevent items from being prematurely removed from the map. if c.store.Update(i, c.ttlMap) { i.flag = itemUpdate } diff --git a/ttl.go b/ttl.go index 201eeec0..23250ec6 100644 --- a/ttl.go +++ b/ttl.go @@ -26,13 +26,16 @@ const ( bucketSizeSecs = 5 ) +// timeToBucket converts a time into a bucket number that will be used +// to store items in the expiration map. func timeToBucket(t time.Time) int { return t.Second() / bucketSizeSecs } -// Map of key to conflict. +// bucket type is a map of key to conflict. type bucket map[uint64]uint64 +// expirationMap is a map of bucket number to the corresponding bucket. type expirationMap struct { sync.RWMutex buckets map[int]bucket @@ -44,7 +47,9 @@ func newExpirationMap() *expirationMap { } } +// Add adds a key-conflict pair to the bucket for this expiration time. func (m *expirationMap) Add(key, conflict uint64, expiration time.Time) { + // Items that don't expire don't need to be in the expiration map. if expiration.IsZero() { return } @@ -52,6 +57,7 @@ func (m *expirationMap) Add(key, conflict uint64, expiration time.Time) { bucketNum := timeToBucket(expiration) m.Lock() defer m.Unlock() + _, ok := m.buckets[bucketNum] if !ok { m.buckets[bucketNum] = make(bucket) @@ -59,6 +65,8 @@ func (m *expirationMap) Add(key, conflict uint64, expiration time.Time) { m.buckets[bucketNum][key] = conflict } +// Delete removes the key-conflict pair from the expiration map. The expiration time +// is needed to be able to find the bucket storing this pair in constant time. func (m *expirationMap) Delete(key uint64, expiration time.Time) { bucketNum := timeToBucket(expiration) m.Lock() @@ -70,6 +78,9 @@ func (m *expirationMap) Delete(key uint64, expiration time.Time) { delete(m.buckets[bucketNum], key) } +// CleanUp removes all the items in the bucket that was just completed. It deletes +// those items from the store, and calls the onEvict function on those items. +// This function is meant to be called periodically. func (m *expirationMap) CleanUp(store store, policy policy, onEvict onEvictFunc) { // Get the bucket number for the current time and substract one. There might be // items in the current bucket that have not expired yet but all the items in From 02776d462ea34c0dcfb8df0629d41c340c039fc0 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Wed, 22 Jan 2020 18:04:20 -0800 Subject: [PATCH 09/12] Address review comments. --- cache.go | 42 +++++++++------------- cache_test.go | 18 ++++++---- store.go | 54 ++++++++++++++++------------ store_test.go | 26 +++++++------- ttl.go | 97 ++++++++++++++++++++++++++++++++++++++------------- 5 files changed, 146 insertions(+), 91 deletions(-) diff --git a/cache.go b/cache.go index 01b45d18..ca4f8341 100644 --- a/cache.go +++ b/cache.go @@ -60,9 +60,6 @@ type Cache struct { stop chan struct{} // cost calculates cost from a value. cost func(value interface{}) int64 - // ttlMap is used to store information about which entries to delete once their - // expiration time has passed. - ttlMap *expirationMap // cleanupTicker is used to periodically check for entries whose TTL has passed. cleanupTicker *time.Ticker // Metrics contains a running log of important statistics like hits, misses, @@ -143,7 +140,7 @@ func NewCache(config *Config) (*Cache, error) { } policy := newPolicy(config.NumCounters, config.MaxCost) cache := &Cache{ - store: newStore(), + store: newStore(newExpirationMap()), policy: policy, getBuf: newRingBuffer(policy, config.BufferItems), setBuf: make(chan *item, setBufSize), @@ -151,8 +148,7 @@ func NewCache(config *Config) (*Cache, error) { keyToHash: config.KeyToHash, stop: make(chan struct{}), cost: config.Cost, - ttlMap: newExpirationMap(), - cleanupTicker: time.NewTicker(bucketSizeSecs * time.Second), + cleanupTicker: time.NewTicker(bucketSize), } if cache.keyToHash == nil { cache.keyToHash = z.KeyToHash @@ -199,18 +195,24 @@ func (c *Cache) Set(key, value interface{}, cost int64) bool { } // SetWithTTL works like Set but adds a key-value pair to the cache that will expire -// after the specified TTL (time to live) has passed. A zero or negative value will -// cause the value to never expire, which is identical to calling Set. +// after the specified TTL (time to live) has passed. A zero value means the value never +// exexpire, which is identical to calling Set. A negative value is a no-op and the value +// is discarded. func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool { if c == nil || key == nil { return false } - now := time.Now() - expiration := now.Add(ttl) - if !expiration.After(now) { - // The TTL is either zero or negative. Treat this item as one without a TTL. - expiration = time.Time{} + var expiration time.Time + switch { + case ttl == 0: + // No expiration. + break + case ttl < 0: + // Treat this a a no-op. + return false + default: + expiration = time.Now().Add(ttl) } keyHash, conflictHash := c.keyToHash(key) @@ -222,10 +224,9 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration cost: cost, expiration: expiration, } - // Attempt to immediately update hashmap value and set flag to update so the // cost is eventually updated. The expiration must also be immediately updated // to prevent items from being prematurely removed from the map. - if c.store.Update(i, c.ttlMap) { + if c.store.Update(i) { i.flag = itemUpdate } // Attempt to send item to policy. @@ -305,19 +306,10 @@ func (c *Cache) processItems() { case itemNew: victims, added := c.policy.Add(i.key, i.cost) if added { - currentExpiration := c.store.Expiration(i.key) - if !currentExpiration.IsZero() { - c.ttlMap.Delete(i.key, currentExpiration) - } - c.store.Set(i) - c.ttlMap.Add(i.key, i.conflict, i.expiration) c.Metrics.add(keyAdd, i.key, 1) } for _, victim := range victims { - if victimExp := c.store.Expiration(victim.key); !victimExp.IsZero() { - c.ttlMap.Delete(victim.key, victimExp) - } victim.conflict, victim.value = c.store.Del(victim.key, 0) if c.onEvict != nil { c.onEvict(victim.key, victim.conflict, victim.value, victim.cost) @@ -332,7 +324,7 @@ func (c *Cache) processItems() { c.store.Del(i.key, i.conflict) } case <-c.cleanupTicker.C: - c.ttlMap.CleanUp(c.store, c.policy, c.onEvict) + c.store.Cleanup(c.policy, c.onEvict) case <-c.stop: return } diff --git a/cache_test.go b/cache_test.go index b0b74839..abe68aae 100644 --- a/cache_test.go +++ b/cache_test.go @@ -377,10 +377,10 @@ func TestCacheSetWithTTL(t *testing.T) { } } - retrySet(1, 1, 1, 3*time.Second) + retrySet(1, 1, 1, time.Second) // Sleep to make sure the item has expired after execution resumes. - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) val, ok := c.Get(1) require.False(t, ok) require.Nil(t, val) @@ -395,12 +395,12 @@ func TestCacheSetWithTTL(t *testing.T) { m.Unlock() // Verify that expiration times are overwritten. - retrySet(2, 1, 1, time.Second) - retrySet(2, 1, 1, 100*time.Second) - time.Sleep(5 * time.Second) + retrySet(2, 1, 1, 100*time.Millisecond) + retrySet(2, 2, 1, 100*time.Second) + time.Sleep(3 * time.Second) val, ok = c.Get(2) require.True(t, ok) - require.Equal(t, 1, val.(int)) + require.Equal(t, 2, val.(int)) } func TestCacheDel(t *testing.T) { @@ -589,3 +589,9 @@ func TestCacheMetricsClear(t *testing.T) { c.Metrics = nil c.Metrics.Clear() } + +func TestMain(m *testing.M) { + // Set bucketSizeSecs to 1 to avoid waiting too much during the tests. + bucketSize = time.Second + m.Run() +} diff --git a/store.go b/store.go index 1acbfd51..1ae1feba 100644 --- a/store.go +++ b/store.go @@ -47,34 +47,38 @@ type store interface { Del(uint64, uint64) (uint64, interface{}) // Update attempts to update the key with a new value and returns true if // successful. - Update(*item, *expirationMap) bool + Update(*item) bool + // Cleanup removes items that have an expired TTL. + Cleanup(policy policy, onEvict onEvictFunc) // Clear clears all contents of the store. Clear() } // newStore returns the default store implementation. -func newStore() store { - return newShardedMap() +func newStore(m *expirationMap) store { + return newShardedMap(m) } const numShards uint64 = 256 type shardedMap struct { shards []*lockedMap + em *expirationMap } -func newShardedMap() *shardedMap { +func newShardedMap(em *expirationMap) *shardedMap { sm := &shardedMap{ shards: make([]*lockedMap, int(numShards)), + em: em, } for i := range sm.shards { - sm.shards[i] = newLockedMap() + sm.shards[i] = newLockedMap(em) } return sm } func (sm *shardedMap) Get(key, conflict uint64) (interface{}, bool) { - return sm.shards[key%numShards].Get(key, conflict) + return sm.shards[key%numShards].get(key, conflict) } func (sm *shardedMap) Expiration(key uint64) time.Time { @@ -94,8 +98,12 @@ func (sm *shardedMap) Del(key, conflict uint64) (uint64, interface{}) { return sm.shards[key%numShards].Del(key, conflict) } -func (sm *shardedMap) Update(newItem *item, eMap *expirationMap) bool { - return sm.shards[newItem.key%numShards].Update(newItem, eMap) +func (sm *shardedMap) Update(newItem *item) bool { + return sm.shards[newItem.key%numShards].Update(newItem) +} + +func (sm *shardedMap) Cleanup(policy policy, onEvict onEvictFunc) { + sm.em.cleanup(sm, policy, onEvict) } func (sm *shardedMap) Clear() { @@ -107,15 +115,17 @@ func (sm *shardedMap) Clear() { type lockedMap struct { sync.RWMutex data map[uint64]storeItem + em *expirationMap } -func newLockedMap() *lockedMap { +func newLockedMap(em *expirationMap) *lockedMap { return &lockedMap{ data: make(map[uint64]storeItem), + em: em, } } -func (m *lockedMap) Get(key, conflict uint64) (interface{}, bool) { +func (m *lockedMap) get(key, conflict uint64) (interface{}, bool) { m.RLock() item, ok := m.data[key] m.RUnlock() @@ -149,7 +159,11 @@ func (m *lockedMap) Set(i *item) { m.Lock() item, ok := m.data[i.key] - if !ok { + + if ok { + m.em.update(i.key, i.conflict, item.expiration, i.expiration) + } else { + m.em.add(i.key, i.conflict, i.expiration) m.data[i.key] = storeItem{ key: i.key, conflict: i.conflict, @@ -183,12 +197,17 @@ func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) { m.Unlock() return 0, nil } + + if !item.expiration.IsZero() { + m.em.del(key, item.expiration) + } + delete(m.data, key) m.Unlock() return item.conflict, item.value } -func (m *lockedMap) Update(newItem *item, eMap *expirationMap) bool { +func (m *lockedMap) Update(newItem *item) bool { m.Lock() item, ok := m.data[newItem.key] if !ok { @@ -200,11 +219,7 @@ func (m *lockedMap) Update(newItem *item, eMap *expirationMap) bool { return false } - // Delete existing item from the expirationMap. - if !item.expiration.IsZero() && eMap != nil { - eMap.Delete(item.key, item.expiration) - } - + m.em.update(newItem.key, newItem.conflict, item.expiration, newItem.expiration) m.data[newItem.key] = storeItem{ key: newItem.key, conflict: newItem.conflict, @@ -212,11 +227,6 @@ func (m *lockedMap) Update(newItem *item, eMap *expirationMap) bool { expiration: newItem.expiration, } - // Add new expiration to the expiration time. - if !newItem.expiration.IsZero() && eMap != nil { - eMap.Add(newItem.key, newItem.conflict, newItem.expiration) - } - m.Unlock() return true } diff --git a/store_test.go b/store_test.go index e76a2ead..5d6bd17f 100644 --- a/store_test.go +++ b/store_test.go @@ -7,7 +7,7 @@ import ( ) func TestStoreSetGet(t *testing.T) { - s := newStore() + s := newStore(nil) key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -36,7 +36,7 @@ func TestStoreSetGet(t *testing.T) { } func TestStoreDel(t *testing.T) { - s := newStore() + s := newStore(nil) key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -52,7 +52,7 @@ func TestStoreDel(t *testing.T) { } func TestStoreClear(t *testing.T) { - s := newStore() + s := newStore(nil) for i := uint64(0); i < 1000; i++ { key, conflict := z.KeyToHash(i) it := item{ @@ -72,7 +72,7 @@ func TestStoreClear(t *testing.T) { } func TestStoreUpdate(t *testing.T) { - s := newStore() + s := newStore(nil) key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -81,7 +81,7 @@ func TestStoreUpdate(t *testing.T) { } s.Set(&i) i.value = 2 - if updated := s.Update(&i, nil); !updated { + if updated := s.Update(&i); !updated { t.Fatal("value should have been updated") } if val, ok := s.Get(key, conflict); val == nil || !ok { @@ -91,7 +91,7 @@ func TestStoreUpdate(t *testing.T) { t.Fatal("value wasn't updated") } i.value = 3 - if !s.Update(&i, nil) { + if !s.Update(&i) { t.Fatal("value should have been updated") } if val, ok := s.Get(key, conflict); val.(int) != 3 || !ok { @@ -103,7 +103,7 @@ func TestStoreUpdate(t *testing.T) { conflict: conflict, value: 2, } - if updated := s.Update(&i, nil); updated { + if updated := s.Update(&i); updated { t.Fatal("value should not have been updated") } if val, ok := s.Get(key, conflict); val != nil || ok { @@ -112,7 +112,7 @@ func TestStoreUpdate(t *testing.T) { } func TestStoreCollision(t *testing.T) { - s := newShardedMap() + s := newShardedMap(nil) s.shards[1].Lock() s.shards[1].data[1] = storeItem{ key: 1, @@ -132,7 +132,7 @@ func TestStoreCollision(t *testing.T) { if val, ok := s.Get(1, 0); !ok || val == nil || val.(int) == 2 { t.Fatal("collision should prevent Set update") } - if s.Update(&i, nil) { + if s.Update(&i) { t.Fatal("collision should prevent Update") } if val, ok := s.Get(1, 0); !ok || val == nil || val.(int) == 2 { @@ -145,7 +145,7 @@ func TestStoreCollision(t *testing.T) { } func BenchmarkStoreGet(b *testing.B) { - s := newStore() + s := newStore(nil) key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -162,7 +162,7 @@ func BenchmarkStoreGet(b *testing.B) { } func BenchmarkStoreSet(b *testing.B) { - s := newStore() + s := newStore(nil) key, conflict := z.KeyToHash(1) b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { @@ -178,7 +178,7 @@ func BenchmarkStoreSet(b *testing.B) { } func BenchmarkStoreUpdate(b *testing.B) { - s := newStore() + s := newStore(nil) key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -193,7 +193,7 @@ func BenchmarkStoreUpdate(b *testing.B) { key: key, conflict: conflict, value: 2, - }, nil) + }) } }) } diff --git a/ttl.go b/ttl.go index 23250ec6..42b22d21 100644 --- a/ttl.go +++ b/ttl.go @@ -17,19 +17,26 @@ package ristretto import ( + "math" "sync" "time" ) -const ( +var ( // TODO: find the optimal value or make it configurable. - bucketSizeSecs = 5 + bucketSize = 5 * time.Second ) -// timeToBucket converts a time into a bucket number that will be used -// to store items in the expiration map. -func timeToBucket(t time.Time) int { - return t.Second() / bucketSizeSecs +func storageBucket(t time.Time) int64 { + bucket := int64(math.Ceil(float64(t.Unix()) / bucketSize.Seconds())) + return bucket +} + +func cleanupBucket(t time.Time) int64 { + // The bucket to cleanup is always behind the storage bucket by one so that + // no elements in that bucket (which might not have expired yet) are deleted. + bucket := storageBucket(t) - 1 + return bucket } // bucket type is a map of key to conflict. @@ -38,37 +45,71 @@ type bucket map[uint64]uint64 // expirationMap is a map of bucket number to the corresponding bucket. type expirationMap struct { sync.RWMutex - buckets map[int]bucket + buckets map[int64]bucket } func newExpirationMap() *expirationMap { return &expirationMap{ - buckets: make(map[int]bucket), + buckets: make(map[int64]bucket), } } -// Add adds a key-conflict pair to the bucket for this expiration time. -func (m *expirationMap) Add(key, conflict uint64, expiration time.Time) { +func (m *expirationMap) add(key, conflict uint64, expiration time.Time) { + if m == nil { + return + } + // Items that don't expire don't need to be in the expiration map. if expiration.IsZero() { return } - bucketNum := timeToBucket(expiration) + bucketNum := storageBucket(expiration) m.Lock() defer m.Unlock() - _, ok := m.buckets[bucketNum] + b, ok := m.buckets[bucketNum] + if !ok { + b = make(bucket) + m.buckets[bucketNum] = b + } + b[key] = conflict +} + +func (m *expirationMap) update(key, conflict uint64, oldExpiration, newExpiration time.Time) { + if m == nil { + return + } + + if oldExpiration.IsZero() { + // Nothing to update. + return + } + + m.Lock() + defer m.Unlock() + + oldBucketNum := storageBucket(oldExpiration) + oldBucket, ok := m.buckets[oldBucketNum] + if ok { + delete(oldBucket, key) + } + + newBucketNum := storageBucket(newExpiration) + newBucket, ok := m.buckets[newBucketNum] if !ok { - m.buckets[bucketNum] = make(bucket) + newBucket = make(bucket) + m.buckets[newBucketNum] = newBucket } - m.buckets[bucketNum][key] = conflict + newBucket[key] = conflict } -// Delete removes the key-conflict pair from the expiration map. The expiration time -// is needed to be able to find the bucket storing this pair in constant time. -func (m *expirationMap) Delete(key uint64, expiration time.Time) { - bucketNum := timeToBucket(expiration) +func (m *expirationMap) del(key uint64, expiration time.Time) { + if m == nil { + return + } + + bucketNum := storageBucket(expiration) m.Lock() defer m.Unlock() _, ok := m.buckets[bucketNum] @@ -78,22 +119,28 @@ func (m *expirationMap) Delete(key uint64, expiration time.Time) { delete(m.buckets[bucketNum], key) } -// CleanUp removes all the items in the bucket that was just completed. It deletes +// cleanup removes all the items in the bucket that was just completed. It deletes // those items from the store, and calls the onEvict function on those items. // This function is meant to be called periodically. -func (m *expirationMap) CleanUp(store store, policy policy, onEvict onEvictFunc) { - // Get the bucket number for the current time and substract one. There might be - // items in the current bucket that have not expired yet but all the items in - // the previous bucket should have expired. - bucketNum := timeToBucket(time.Now()) - 1 +func (m *expirationMap) cleanup(store store, policy policy, onEvict onEvictFunc) { + if m == nil { + return + } + now := time.Now() + bucketNum := cleanupBucket(now) m.Lock() keys := m.buckets[bucketNum] delete(m.buckets, bucketNum) m.Unlock() for key, conflict := range keys { - _, value := store.Del(key, 0) + // Sanity check. Verify that the store agrees that this key is expired. + if store.Expiration(key).After(now) { + continue + } + + _, value := store.Del(key, conflict) cost := policy.Cost(key) if onEvict != nil { onEvict(key, conflict, value, cost) From d079af8c4ea2b8308bf62659217e005b3ae4aa06 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 24 Jan 2020 14:51:20 -0800 Subject: [PATCH 10/12] Add os.Exit call in TestMain. --- cache_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cache_test.go b/cache_test.go index abe68aae..4eab4d3b 100644 --- a/cache_test.go +++ b/cache_test.go @@ -2,6 +2,7 @@ package ristretto import ( "math/rand" + "os" "strings" "sync" "testing" @@ -594,4 +595,5 @@ func TestMain(m *testing.M) { // Set bucketSizeSecs to 1 to avoid waiting too much during the tests. bucketSize = time.Second m.Run() + os.Exit(0) } From 12a24da3dc879f59229473ff06a0c33107f2882d Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 24 Jan 2020 17:40:40 -0800 Subject: [PATCH 11/12] address review comments. --- cache.go | 4 ++-- cache_test.go | 2 +- store.go | 24 +++++++++++------------- store_test.go | 16 ++++++++-------- ttl.go | 22 +++++++--------------- 5 files changed, 29 insertions(+), 39 deletions(-) diff --git a/cache.go b/cache.go index ca4f8341..5da66a39 100644 --- a/cache.go +++ b/cache.go @@ -140,7 +140,7 @@ func NewCache(config *Config) (*Cache, error) { } policy := newPolicy(config.NumCounters, config.MaxCost) cache := &Cache{ - store: newStore(newExpirationMap()), + store: newStore(), policy: policy, getBuf: newRingBuffer(policy, config.BufferItems), setBuf: make(chan *item, setBufSize), @@ -148,7 +148,7 @@ func NewCache(config *Config) (*Cache, error) { keyToHash: config.KeyToHash, stop: make(chan struct{}), cost: config.Cost, - cleanupTicker: time.NewTicker(bucketSize), + cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2), } if cache.keyToHash == nil { cache.keyToHash = z.KeyToHash diff --git a/cache_test.go b/cache_test.go index 4eab4d3b..9cf70d30 100644 --- a/cache_test.go +++ b/cache_test.go @@ -593,7 +593,7 @@ func TestCacheMetricsClear(t *testing.T) { func TestMain(m *testing.M) { // Set bucketSizeSecs to 1 to avoid waiting too much during the tests. - bucketSize = time.Second + bucketDurationSecs = 1 m.Run() os.Exit(0) } diff --git a/store.go b/store.go index 1ae1feba..f75c23ea 100644 --- a/store.go +++ b/store.go @@ -55,24 +55,24 @@ type store interface { } // newStore returns the default store implementation. -func newStore(m *expirationMap) store { - return newShardedMap(m) +func newStore() store { + return newShardedMap() } const numShards uint64 = 256 type shardedMap struct { - shards []*lockedMap - em *expirationMap + shards []*lockedMap + expiryMap *expirationMap } -func newShardedMap(em *expirationMap) *shardedMap { +func newShardedMap() *shardedMap { sm := &shardedMap{ - shards: make([]*lockedMap, int(numShards)), - em: em, + shards: make([]*lockedMap, int(numShards)), + expiryMap: newExpirationMap(), } for i := range sm.shards { - sm.shards[i] = newLockedMap(em) + sm.shards[i] = newLockedMap(sm.expiryMap) } return sm } @@ -103,7 +103,7 @@ func (sm *shardedMap) Update(newItem *item) bool { } func (sm *shardedMap) Cleanup(policy policy, onEvict onEvictFunc) { - sm.em.cleanup(sm, policy, onEvict) + sm.expiryMap.cleanup(sm, policy, onEvict) } func (sm *shardedMap) Clear() { @@ -137,10 +137,8 @@ func (m *lockedMap) get(key, conflict uint64) (interface{}, bool) { } // Handle expired items. - if !item.expiration.IsZero() { - if time.Now().After(item.expiration) { - return nil, false - } + if !item.expiration.IsZero() && time.Now().After(item.expiration) { + return nil, false } return item.value, true } diff --git a/store_test.go b/store_test.go index 5d6bd17f..53ee984d 100644 --- a/store_test.go +++ b/store_test.go @@ -7,7 +7,7 @@ import ( ) func TestStoreSetGet(t *testing.T) { - s := newStore(nil) + s := newStore() key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -36,7 +36,7 @@ func TestStoreSetGet(t *testing.T) { } func TestStoreDel(t *testing.T) { - s := newStore(nil) + s := newStore() key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -52,7 +52,7 @@ func TestStoreDel(t *testing.T) { } func TestStoreClear(t *testing.T) { - s := newStore(nil) + s := newStore() for i := uint64(0); i < 1000; i++ { key, conflict := z.KeyToHash(i) it := item{ @@ -72,7 +72,7 @@ func TestStoreClear(t *testing.T) { } func TestStoreUpdate(t *testing.T) { - s := newStore(nil) + s := newStore() key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -112,7 +112,7 @@ func TestStoreUpdate(t *testing.T) { } func TestStoreCollision(t *testing.T) { - s := newShardedMap(nil) + s := newShardedMap() s.shards[1].Lock() s.shards[1].data[1] = storeItem{ key: 1, @@ -145,7 +145,7 @@ func TestStoreCollision(t *testing.T) { } func BenchmarkStoreGet(b *testing.B) { - s := newStore(nil) + s := newStore() key, conflict := z.KeyToHash(1) i := item{ key: key, @@ -162,7 +162,7 @@ func BenchmarkStoreGet(b *testing.B) { } func BenchmarkStoreSet(b *testing.B) { - s := newStore(nil) + s := newStore() key, conflict := z.KeyToHash(1) b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { @@ -178,7 +178,7 @@ func BenchmarkStoreSet(b *testing.B) { } func BenchmarkStoreUpdate(b *testing.B) { - s := newStore(nil) + s := newStore() key, conflict := z.KeyToHash(1) i := item{ key: key, diff --git a/ttl.go b/ttl.go index 42b22d21..02864f7e 100644 --- a/ttl.go +++ b/ttl.go @@ -17,26 +17,23 @@ package ristretto import ( - "math" "sync" "time" ) var ( // TODO: find the optimal value or make it configurable. - bucketSize = 5 * time.Second + bucketDurationSecs = int64(5) ) func storageBucket(t time.Time) int64 { - bucket := int64(math.Ceil(float64(t.Unix()) / bucketSize.Seconds())) - return bucket + return (t.Unix() / bucketDurationSecs) + 1 } func cleanupBucket(t time.Time) int64 { // The bucket to cleanup is always behind the storage bucket by one so that // no elements in that bucket (which might not have expired yet) are deleted. - bucket := storageBucket(t) - 1 - return bucket + return storageBucket(t) - 1 } // bucket type is a map of key to conflict. @@ -76,26 +73,21 @@ func (m *expirationMap) add(key, conflict uint64, expiration time.Time) { b[key] = conflict } -func (m *expirationMap) update(key, conflict uint64, oldExpiration, newExpiration time.Time) { +func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime time.Time) { if m == nil { return } - if oldExpiration.IsZero() { - // Nothing to update. - return - } - m.Lock() defer m.Unlock() - oldBucketNum := storageBucket(oldExpiration) + oldBucketNum := storageBucket(oldExpTime) oldBucket, ok := m.buckets[oldBucketNum] if ok { delete(oldBucket, key) } - newBucketNum := storageBucket(newExpiration) + newBucketNum := storageBucket(newExpTime) newBucket, ok := m.buckets[newBucketNum] if !ok { newBucket = make(bucket) @@ -127,9 +119,9 @@ func (m *expirationMap) cleanup(store store, policy policy, onEvict onEvictFunc) return } + m.Lock() now := time.Now() bucketNum := cleanupBucket(now) - m.Lock() keys := m.buckets[bucketNum] delete(m.buckets, bucketNum) m.Unlock() From affff5486e33fd5a7b19ff69ff36882c8b682fda Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Mon, 27 Jan 2020 10:45:58 -0800 Subject: [PATCH 12/12] Add more tests. --- cache.go | 2 +- cache_test.go | 70 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/cache.go b/cache.go index 5da66a39..603647ab 100644 --- a/cache.go +++ b/cache.go @@ -196,7 +196,7 @@ func (c *Cache) Set(key, value interface{}, cost int64) bool { // SetWithTTL works like Set but adds a key-value pair to the cache that will expire // after the specified TTL (time to live) has passed. A zero value means the value never -// exexpire, which is identical to calling Set. A negative value is a no-op and the value +// expires, which is identical to calling Set. A negative value is a no-op and the value // is discarded. func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool { if c == nil || key == nil { diff --git a/cache_test.go b/cache_test.go index 9cf70d30..908219de 100644 --- a/cache_test.go +++ b/cache_test.go @@ -2,7 +2,6 @@ package ristretto import ( "math/rand" - "os" "strings" "sync" "testing" @@ -345,6 +344,23 @@ func TestCacheSet(t *testing.T) { } } +// retrySet calls SetWithTTL until the item is accepted by the cache. +func retrySet(t *testing.T, c *Cache, key, value int, cost int64, ttl time.Duration) { + for { + if set := c.SetWithTTL(key, value, cost, ttl); !set { + time.Sleep(wait) + continue + } + + time.Sleep(wait) + val, ok := c.Get(key) + require.True(t, ok) + require.NotNil(t, val) + require.Equal(t, value, val.(int)) + return + } +} + func TestCacheSetWithTTL(t *testing.T) { m := &sync.Mutex{} evicted := make(map[uint64]struct{}) @@ -361,24 +377,7 @@ func TestCacheSetWithTTL(t *testing.T) { }) require.NoError(t, err) - // retrySet calls SetWithTTL until the item is accepted by the cache. - retrySet := func(key, value int, cost int64, ttl time.Duration) { - for { - if set := c.SetWithTTL(key, value, cost, ttl); !set { - time.Sleep(wait) - continue - } - - time.Sleep(wait) - val, ok := c.Get(key) - require.True(t, ok) - require.NotNil(t, val) - require.Equal(t, value, val.(int)) - return - } - } - - retrySet(1, 1, 1, time.Second) + retrySet(t, c, 1, 1, 1, time.Second) // Sleep to make sure the item has expired after execution resumes. time.Sleep(2 * time.Second) @@ -396,12 +395,20 @@ func TestCacheSetWithTTL(t *testing.T) { m.Unlock() // Verify that expiration times are overwritten. - retrySet(2, 1, 1, 100*time.Millisecond) - retrySet(2, 2, 1, 100*time.Second) + retrySet(t, c, 2, 1, 1, time.Second) + retrySet(t, c, 2, 2, 1, 100*time.Second) time.Sleep(3 * time.Second) val, ok = c.Get(2) require.True(t, ok) require.Equal(t, 2, val.(int)) + + // Verify that entries with no expiration are overwritten. + retrySet(t, c, 3, 1, 1, 0) + retrySet(t, c, 3, 2, 1, time.Second) + time.Sleep(3 * time.Second) + val, ok = c.Get(3) + require.False(t, ok) + require.Nil(t, val) } func TestCacheDel(t *testing.T) { @@ -427,6 +434,23 @@ func TestCacheDel(t *testing.T) { c.Del(1) } +func TestCacheDelWithTTL(t *testing.T) { + c, err := NewCache(&Config{ + NumCounters: 100, + MaxCost: 10, + BufferItems: 64, + }) + require.NoError(t, err) + retrySet(t, c, 3, 1, 1, 10*time.Second) + time.Sleep(1 * time.Second) + // Delete the item + c.Del(3) + // Ensure the key is deleted. + val, ok := c.Get(3) + require.False(t, ok) + require.Nil(t, val) +} + func TestCacheClear(t *testing.T) { c, err := NewCache(&Config{ NumCounters: 100, @@ -591,9 +615,7 @@ func TestCacheMetricsClear(t *testing.T) { c.Metrics.Clear() } -func TestMain(m *testing.M) { +func init() { // Set bucketSizeSecs to 1 to avoid waiting too much during the tests. bucketDurationSecs = 1 - m.Run() - os.Exit(0) }