Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions net/valkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"math"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -397,12 +399,20 @@ func (vrc *ValkeyRingClient) Close() error {

func (vrc *ValkeyRingClient) startUpdater(ctx context.Context) {
old := vrc.options.Addrs
sort.Strings(old)
vrc.log.Infof("Start goroutine to update valkey instances every %s", vrc.options.UpdateInterval)
defer vrc.log.Info("Stopped goroutine to update valkey")

ticker := time.NewTicker(vrc.options.UpdateInterval)
defer ticker.Stop()

init := true
if len(old) != 0 {
vrc.SetAddrs(ctx, old)
vrc.log.Infof("Valkey updater initial set to %d shards", len(old))
init = false
}

for {

select {
Expand All @@ -413,18 +423,25 @@ func (vrc *ValkeyRingClient) startUpdater(ctx context.Context) {

addrs, err := vrc.options.AddrUpdater()
if err != nil {
vrc.log.Errorf("Failed to run valkey updater: %v", err)
vrc.log.Errorf("Failed to update valkey ring: %v", err)
continue
}
sort.Strings(addrs)

if init {
if len(addrs) != 0 {
init = false
vrc.SetAddrs(ctx, addrs)
vrc.log.Infof("Valkey updater initial set to %d shards", len(addrs))
old = addrs
}
continue
}
if !init && len(difference(addrs, old)) != 0 {

if !slices.Equal(old, addrs) {
vrc.SetAddrs(ctx, addrs)
vrc.log.Infof("Valkey updater updated old(%d) -> new(%d)", len(old), len(addrs))

old = addrs
} else if init && len(addrs) != 0 {
init = false
vrc.SetAddrs(ctx, addrs)
vrc.log.Infof("Valkey updater initial set to %d shards", len(addrs))
old = addrs
}
}
Expand Down Expand Up @@ -553,9 +570,9 @@ func difference(a, b []string) []string {
result = append(result, item)
}
}

return result
}

func intersect(slice1, slice2 []string) []string {
set := make(map[string]struct{})
for _, item := range slice1 {
Expand All @@ -569,6 +586,5 @@ func intersect(slice1, slice2 []string) []string {
delete(set, item)
}
}

return result
}
40 changes: 40 additions & 0 deletions net/valkey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,46 @@ import (
"github.com/zalando/skipper/tracing/tracingtest"
)

func TestValkeyDifference(t *testing.T) {
for _, tt := range []struct {
name string
a []string
b []string
want []string
}{
{
name: "test a > b",
a: []string{"1s"},
b: []string{},
want: []string{"1s"},
},
{
name: "test a < b",
a: []string{},
b: []string{"1s"},
want: []string{},
},
{
name: "test a = b",
a: []string{"1s"},
b: []string{"1s"},
want: []string{},
},
{
name: "test a != b",
a: []string{"1s"},
b: []string{"2s"},
want: []string{"1s"},
}} {
t.Run(tt.name, func(t *testing.T) {
if got := difference(tt.a, tt.b); len(got) != len(tt.want) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the function still in use?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in some other part which needs set operations like it is.

t.Errorf("Failed to get correct difference: Want %v, got %v", tt.want, got)
}
})
}

}

func TestValkeyContainer(t *testing.T) {
valkeyAddr, done := valkeytest.NewTestValkey(t)
defer done()
Expand Down
Loading