Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions x/ref/runtime/internal/rpc/proxymgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func newProxyManager(s serverProxyAPI, proxyName string, policy rpc.ProxyPolicy,
}

func (pm *proxyManager) selectRandomSubsetLocked(needed, available int) map[int]bool {
if needed == 0 {
return nil
}
selected := map[int]bool{}
for {
candidate := pm.rand.Intn(available)
Expand Down Expand Up @@ -160,6 +163,7 @@ func (pm *proxyManager) updateAvailableProxies(ctx *context.T) {
pm.Lock()
defer pm.Unlock()
pm.proxies = updated

}

func (pm *proxyManager) markActive(ep naming.Endpoint) {
Expand All @@ -175,8 +179,6 @@ func (pm *proxyManager) markInActive(ep naming.Endpoint) {
}

func (pm *proxyManager) connectToSingleProxy(ctx *context.T, name string, ep naming.Endpoint) {
pm.markActive(ep)
defer pm.markInActive(ep)
for delay := pm.reconnectDelay; ; delay = nextDelay(delay) {
if !pm.isAvailable(ep) {
ctx.Infof("connectToSingleProxy(%q): proxy is no longer available\n", ep)
Expand Down Expand Up @@ -212,14 +214,26 @@ func (pm *proxyManager) tryConnections(ctx *context.T, notifyCh chan struct{}) b
return false
}
for _, ep := range idle {
if !pm.canGrow() {
continue
}
pm.markActive(ep)
go func(ep naming.Endpoint) {
pm.connectToSingleProxy(ctx, pm.proxyName, ep)
notifyCh <- struct{}{}
pm.markInActive(ep)
sendNotify(notifyCh)
}(ep)
}
return true
}

func sendNotify(ch chan struct{}) {
select {
case ch <- struct{}{}:
default:
}
}

func drainNotifyChan(ch chan struct{}) {
for {
select {
Expand All @@ -230,9 +244,29 @@ func drainNotifyChan(ch chan struct{}) {
}
}

func (pm *proxyManager) watchForChanges(ctx *context.T, ch chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-time.After(pm.resolveDelay):
pm.updateAvailableProxies(ctx)
if pm.shouldGrow() && pm.canGrow() {
sendNotify(ch)
}
}
}
}

func (pm *proxyManager) manageProxyConnections(ctx *context.T) {
notifyCh := make(chan struct{}, 10)
pm.updateAvailableProxies(ctx)
// Watch for changes in the set of available proxies so that for the
// 'all' policy, the server will connect to new proxies as they appear.
// For other policies reconnection may be little faster since the
// new set of proxies is already available.
go pm.watchForChanges(ctx, notifyCh)

for {
select {
case <-ctx.Done():
Expand All @@ -253,7 +287,7 @@ func (pm *proxyManager) manageProxyConnections(ctx *context.T) {
drainNotifyChan(notifyCh)
}
// Wait for a change in the set of available proxies.
if pm.shouldGrow() {
if pm.shouldGrow() && !pm.canGrow() {
for {
select {
case <-ctx.Done():
Expand Down
84 changes: 71 additions & 13 deletions x/ref/runtime/internal/rpc/proxymgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,19 @@ func TestSingleProxyConnections(t *testing.T) {
}
}

func waitForExpected(sm *mockServerAPI, proxyName string, expected int) {
// Wait for the expected number of connections.
for {
time.Sleep(100 * time.Millisecond)
sm.Lock()
if len(sm.listening[proxyName]) == expected {
sm.Unlock()
break
}
sm.Unlock()
}
}

func TestMultipleProxyConnections(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
Expand Down Expand Up @@ -363,13 +376,7 @@ func TestMultipleProxyConnections(t *testing.T) {
time.Sleep(time.Millisecond * 100)
sm.setEndpoints(proxyName, eps...)

// Wait for the expected number of connections.
for {
time.Sleep(100 * time.Millisecond)
if len(sm.listening[proxyName]) == tc.expected {
break
}
}
waitForExpected(sm, proxyName, tc.expected)

// Remove the endpoints and finish the current listeners.
sm.setEndpoints(proxyName)
Expand All @@ -394,14 +401,65 @@ func TestMultipleProxyConnections(t *testing.T) {
pm.updateAvailableProxies(ctx)

// Wait for the expected number of connections.
for {
time.Sleep(100 * time.Millisecond)
if len(sm.listening[proxyName]) == tc.expected {
break
}
}
waitForExpected(sm, proxyName, tc.expected)

cancel()
// Should immediately return if the context is already canceled.
pm.manageProxyConnections(cctx)

wg.Wait()
}

}

func TestMultipleProxyConnectionExpansion(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
sm := newMockServer()
ep1 := newEndpoint("5000")
ep2 := newEndpoint("5001")
ep3 := newEndpoint("5002")

fpm := newProxyManager(sm, "proxy0", rpc.UseFirstProxy, 1)
rpm := newProxyManager(sm, "proxy1", rpc.UseRandomProxy, 1)
apm := newProxyManager(sm, "proxy2", rpc.UseAllProxies, 3)

for i, tc := range []struct {
pm *proxyManager
initial int
final int
}{
{fpm, 1, 1},
{rpm, 1, 1},
{apm, 1, 3},
} {
cctx, cancel := context.WithCancel(ctx)
pm := tc.pm
// tune down the delays
pm.resolveDelay = time.Millisecond
pm.reconnectDelay = time.Millisecond
proxyName := fmt.Sprintf("proxy%v", i)
ch := make(chan struct{})
sm.setChan(proxyName, ch)

var wg sync.WaitGroup
wg.Add(1)
go func(pm *proxyManager) {
pm.manageProxyConnections(cctx)
wg.Done()
}(pm)

sm.setEndpoints(proxyName, ep1)

waitForExpected(sm, proxyName, tc.initial)
sm.setEndpoints(proxyName, ep1, ep2, ep3)
time.Sleep(100 * time.Millisecond)

// Wait for the expected number of connections.
waitForExpected(sm, proxyName, tc.final)

cancel()

// Should immediately return if the context is already canceled.
pm.manageProxyConnections(cctx)

Expand Down
11 changes: 9 additions & 2 deletions x/ref/runtime/internal/rpc/test/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,15 @@ func TestStartCallErrors(t *testing.T) { //nolint:gocyclo
if !errors.Is(err, verror.ErrNoServers) {
t.Errorf("wrong error: %s", err)
}
if want := "connection refused"; !strings.Contains(verror.DebugString(err), want) {
t.Errorf("wrong error: %s - doesn't contain %q", err, want)
found := false
allowed := []string{"connection reset by peer", "connection refused"}
for _, want := range allowed {
if strings.Contains(verror.DebugString(err), want) {
found = true
}
}
if !found {
t.Errorf("wrong error: %s - doesn't contain one of %q", err, allowed)
}

// This will fail with NoServers, but because there really is no
Expand Down
83 changes: 79 additions & 4 deletions x/ref/services/xproxy/xproxyd/proxyd_v23_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
responseVar = "RESPONSE" // Name of the variable used by client program to output the first response
responseVar1 = "RESPONSE1" // Name of the variable used by client program to output the second response
downloadSize = 64 * 1024 * 1024
using1OfMProxies = 1 // Use 1 out of the total available set of proxies when using all proxies.
using2OfMProxies = 2 // Use 2 out of the total available set of proxies when using all proxies.
)

Expand Down Expand Up @@ -106,7 +107,7 @@ func TestV23MultipleProxyd(t *testing.T) {
firstProxyAddress, firstProxyLog, _, err := startServer(t, sh, serverName, 1, runServer, serverCreds)
assert("first proxy policy server", firstProxyLog)

allProxiesAddress, allProxiesLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxies, serverCreds)
allProxiesAddress, allProxiesLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxiesLimit2, serverCreds)
assert("all proxies policy server", allProxiesLog)

// Run all of the clients.
Expand Down Expand Up @@ -279,7 +280,7 @@ func TestV23MultiProxyResilience(t *testing.T) {
assert("first two proxies", logsForProxies(first2)...)

// Start the server.
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxies, serverCreds)
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxiesLimit2, serverCreds)
assert("server", serverLog)

// Run the client.
Expand Down Expand Up @@ -343,6 +344,74 @@ func TestV23MultiProxyResilience(t *testing.T) {

}

func TestV23MultiProxyExpansion(t *testing.T) {

v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()

var (
serverCreds = sh.ForkCredentials("server")
clientCreds = sh.ForkCredentials("client")
err error
)

assert := func(msg string, logs ...*bytes.Buffer) {
assertWithLog(t, err, msg, logs)
}

ns := v23.GetNamespace(sh.Ctx)
ns.CacheCtl(naming.DisableCache(true))

// Start a single proxy.
first, _, firstStatsAddrs := startInitialSetOfProxies(t, sh, 1)
assert("first proxy", logsForProxies(first)...)

// Start the server.
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using1OfMProxies, runServerAllProxiesNoLimit, serverCreds)
assert("server", serverLog)

// Run the client.
err = runSingleClient(sh, runClientAllProxiesServer, clientCreds)
assert("client")

// Gather stats and make sure the the server is using the first proxy.
ctx, err := v23.WithPrincipal(sh.Ctx, serverCreds.Principal)
assert("withPrincipal")

requests, _, _, err := gatherStats(ctx, firstStatsAddrs, serverAddress)
assert("gatherStats")

used := proxiesUsedForServer(requests, serverAddress)
if got, want := used, []int{0}; !reflect.DeepEqual(got, want) {
t.Fatalf("got %v, want %v", got, want)
}

// Start two more proxies and wait for the server to notice them.
second := startProxy(t, sh)
third := startProxy(t, sh)

_, err = waitForNMountedServers(t, sh.Ctx, ns, serverNameAll, 3)
assert("second and third proxies and server log", second.log, third.log, serverLog)

_, proxyStatsAddresses, err := waitForNProxies(t, sh.Ctx, ns, 3)
assert("wait for all three proxies to be in the mounttable")

// Run the client.
err = runSingleClient(sh, runClientAllProxiesServer, clientCreds)
assert("client with two proxies again")

requests, _, _, err = gatherStats(ctx, proxyStatsAddresses, serverAddress)
assert("gatherStats")

used = proxiesUsedForServer(requests, serverAddress)
if got, want := len(used), 3; !reflect.DeepEqual(got, want) {
t.Fatalf("got %v, want %v", got, want)
}

}

func TestV23SingleProxyResilience(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
Expand Down Expand Up @@ -701,11 +770,17 @@ var runServerRandomProxy = gosh.RegisterFunc(
"runServerRandomProxy",
createProxiedServer(serverNameRandom, proxyName, rpc.UseRandomProxy, 0),
)
var runServerAllProxies = gosh.RegisterFunc(
"runServerAllProxies",

var runServerAllProxiesLimit2 = gosh.RegisterFunc(
"runServerAllProxiesLimit2",
createProxiedServer(serverNameAll, proxyName, rpc.UseAllProxies, using2OfMProxies),
)

var runServerAllProxiesNoLimit = gosh.RegisterFunc(
"runServerAllProxiesNoLimit",
createProxiedServer(serverNameAll, proxyName, rpc.UseAllProxies, 0),
)

func createClient(serverName string, iterations int) func() error {
return func() error {
ctx, shutdown := test.V23Init()
Expand Down