From fcacfccf580ca1cca7f5f0c0c6c1bd726f205099 Mon Sep 17 00:00:00 2001 From: Cosmos Nicolaou Date: Tue, 9 Feb 2021 14:02:20 -0800 Subject: [PATCH 1/3] x/ref/runtime/internal/rpc: fix issue with proxy --- x/ref/runtime/internal/rpc/proxymgr.go | 40 ++++++++- x/ref/runtime/internal/rpc/proxymgr_test.go | 84 ++++++++++++++++--- .../xproxy/xproxyd/proxyd_v23_test.go | 83 +++++++++++++++++- 3 files changed, 186 insertions(+), 21 deletions(-) diff --git a/x/ref/runtime/internal/rpc/proxymgr.go b/x/ref/runtime/internal/rpc/proxymgr.go index 60d51a7cb..4e97c1f94 100644 --- a/x/ref/runtime/internal/rpc/proxymgr.go +++ b/x/ref/runtime/internal/rpc/proxymgr.go @@ -59,6 +59,9 @@ func newProxyManager(s serverProxyAPI, proxyName string, policy rpc.ProxyPolicy, } func (pm *proxyManager) selectRandomSubsetLocked(needed, available int) map[int]bool { + if needed == 0 { + return nil + } selected := map[int]bool{} for { candidate := pm.rand.Intn(available) @@ -160,6 +163,7 @@ func (pm *proxyManager) updateAvailableProxies(ctx *context.T) { pm.Lock() defer pm.Unlock() pm.proxies = updated + } func (pm *proxyManager) markActive(ep naming.Endpoint) { @@ -175,8 +179,6 @@ func (pm *proxyManager) markInActive(ep naming.Endpoint) { } func (pm *proxyManager) connectToSingleProxy(ctx *context.T, name string, ep naming.Endpoint) { - pm.markActive(ep) - defer pm.markInActive(ep) for delay := pm.reconnectDelay; ; delay = nextDelay(delay) { if !pm.isAvailable(ep) { ctx.Infof("connectToSingleProxy(%q): proxy is no longer available\n", ep) @@ -212,14 +214,26 @@ func (pm *proxyManager) tryConnections(ctx *context.T, notifyCh chan struct{}) b return false } for _, ep := range idle { + if !pm.canGrow() { + continue + } + pm.markActive(ep) go func(ep naming.Endpoint) { pm.connectToSingleProxy(ctx, pm.proxyName, ep) - notifyCh <- struct{}{} + pm.markInActive(ep) + sendNotify(notifyCh) }(ep) } return true } +func sendNotify(ch chan struct{}) { + select { + case ch <- struct{}{}: + default: + } +} + func drainNotifyChan(ch chan struct{}) { for { select { @@ -233,6 +247,24 @@ func drainNotifyChan(ch chan struct{}) { func (pm *proxyManager) manageProxyConnections(ctx *context.T) { notifyCh := make(chan struct{}, 10) pm.updateAvailableProxies(ctx) + // Watch for changes in the set of available proxies so that for the + // 'all' policy, the server will connect to new proxies as they appear. + // For other policies reconnection may be little faster since the + // new set of proxies is already available. + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(pm.resolveDelay): + } + pm.updateAvailableProxies(ctx) + if pm.shouldGrow() && pm.canGrow() { + sendNotify(notifyCh) + } + } + }() + for { select { case <-ctx.Done(): @@ -253,7 +285,7 @@ func (pm *proxyManager) manageProxyConnections(ctx *context.T) { drainNotifyChan(notifyCh) } // Wait for a change in the set of available proxies. - if pm.shouldGrow() { + if pm.shouldGrow() && !pm.canGrow() { for { select { case <-ctx.Done(): diff --git a/x/ref/runtime/internal/rpc/proxymgr_test.go b/x/ref/runtime/internal/rpc/proxymgr_test.go index f1c255a31..b59a494a0 100644 --- a/x/ref/runtime/internal/rpc/proxymgr_test.go +++ b/x/ref/runtime/internal/rpc/proxymgr_test.go @@ -322,6 +322,19 @@ func TestSingleProxyConnections(t *testing.T) { } } +func waitForExpected(sm *mockServerAPI, proxyName string, expected int) { + // Wait for the expected number of connections. + for { + time.Sleep(100 * time.Millisecond) + sm.Lock() + if len(sm.listening[proxyName]) == expected { + sm.Unlock() + break + } + sm.Unlock() + } +} + func TestMultipleProxyConnections(t *testing.T) { ctx, shutdown := test.V23Init() defer shutdown() @@ -363,13 +376,7 @@ func TestMultipleProxyConnections(t *testing.T) { time.Sleep(time.Millisecond * 100) sm.setEndpoints(proxyName, eps...) - // Wait for the expected number of connections. - for { - time.Sleep(100 * time.Millisecond) - if len(sm.listening[proxyName]) == tc.expected { - break - } - } + waitForExpected(sm, proxyName, tc.expected) // Remove the endpoints and finish the current listeners. sm.setEndpoints(proxyName) @@ -394,14 +401,65 @@ func TestMultipleProxyConnections(t *testing.T) { pm.updateAvailableProxies(ctx) // Wait for the expected number of connections. - for { - time.Sleep(100 * time.Millisecond) - if len(sm.listening[proxyName]) == tc.expected { - break - } - } + waitForExpected(sm, proxyName, tc.expected) + + cancel() + // Should immediately return if the context is already canceled. + pm.manageProxyConnections(cctx) + + wg.Wait() + } + +} + +func TestMultipleProxyConnectionExpansion(t *testing.T) { + ctx, shutdown := test.V23Init() + defer shutdown() + sm := newMockServer() + ep1 := newEndpoint("5000") + ep2 := newEndpoint("5001") + ep3 := newEndpoint("5002") + + fpm := newProxyManager(sm, "proxy0", rpc.UseFirstProxy, 1) + rpm := newProxyManager(sm, "proxy1", rpc.UseRandomProxy, 1) + apm := newProxyManager(sm, "proxy2", rpc.UseAllProxies, 3) + + for i, tc := range []struct { + pm *proxyManager + initial int + final int + }{ + {fpm, 1, 1}, + {rpm, 1, 1}, + {apm, 1, 3}, + } { + cctx, cancel := context.WithCancel(ctx) + pm := tc.pm + // tune down the delays + pm.resolveDelay = time.Millisecond + pm.reconnectDelay = time.Millisecond + proxyName := fmt.Sprintf("proxy%v", i) + ch := make(chan struct{}) + sm.setChan(proxyName, ch) + + var wg sync.WaitGroup + wg.Add(1) + go func(pm *proxyManager) { + pm.manageProxyConnections(cctx) + wg.Done() + }(pm) + + sm.setEndpoints(proxyName, ep1) + + waitForExpected(sm, proxyName, tc.initial) + sm.setEndpoints(proxyName, ep1, ep2, ep3) + time.Sleep(100 * time.Millisecond) + + // Wait for the expected number of connections. + waitForExpected(sm, proxyName, tc.final) cancel() + // Should immediately return if the context is already canceled. pm.manageProxyConnections(cctx) diff --git a/x/ref/services/xproxy/xproxyd/proxyd_v23_test.go b/x/ref/services/xproxy/xproxyd/proxyd_v23_test.go index 7495251ce..feab9e95c 100644 --- a/x/ref/services/xproxy/xproxyd/proxyd_v23_test.go +++ b/x/ref/services/xproxy/xproxyd/proxyd_v23_test.go @@ -46,6 +46,7 @@ const ( responseVar = "RESPONSE" // Name of the variable used by client program to output the first response responseVar1 = "RESPONSE1" // Name of the variable used by client program to output the second response downloadSize = 64 * 1024 * 1024 + using1OfMProxies = 1 // Use 1 out of the total available set of proxies when using all proxies. using2OfMProxies = 2 // Use 2 out of the total available set of proxies when using all proxies. ) @@ -106,7 +107,7 @@ func TestV23MultipleProxyd(t *testing.T) { firstProxyAddress, firstProxyLog, _, err := startServer(t, sh, serverName, 1, runServer, serverCreds) assert("first proxy policy server", firstProxyLog) - allProxiesAddress, allProxiesLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxies, serverCreds) + allProxiesAddress, allProxiesLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxiesLimit2, serverCreds) assert("all proxies policy server", allProxiesLog) // Run all of the clients. @@ -279,7 +280,7 @@ func TestV23MultiProxyResilience(t *testing.T) { assert("first two proxies", logsForProxies(first2)...) // Start the server. - serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxies, serverCreds) + serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxiesLimit2, serverCreds) assert("server", serverLog) // Run the client. @@ -343,6 +344,74 @@ func TestV23MultiProxyResilience(t *testing.T) { } +func TestV23MultiProxyExpansion(t *testing.T) { + + v23test.SkipUnlessRunningIntegrationTests(t) + sh := v23test.NewShell(t, nil) + defer sh.Cleanup() + sh.StartRootMountTable() + + var ( + serverCreds = sh.ForkCredentials("server") + clientCreds = sh.ForkCredentials("client") + err error + ) + + assert := func(msg string, logs ...*bytes.Buffer) { + assertWithLog(t, err, msg, logs) + } + + ns := v23.GetNamespace(sh.Ctx) + ns.CacheCtl(naming.DisableCache(true)) + + // Start a single proxy. + first, _, firstStatsAddrs := startInitialSetOfProxies(t, sh, 1) + assert("first proxy", logsForProxies(first)...) + + // Start the server. + serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using1OfMProxies, runServerAllProxiesNoLimit, serverCreds) + assert("server", serverLog) + + // Run the client. + err = runSingleClient(sh, runClientAllProxiesServer, clientCreds) + assert("client") + + // Gather stats and make sure the the server is using the first proxy. + ctx, err := v23.WithPrincipal(sh.Ctx, serverCreds.Principal) + assert("withPrincipal") + + requests, _, _, err := gatherStats(ctx, firstStatsAddrs, serverAddress) + assert("gatherStats") + + used := proxiesUsedForServer(requests, serverAddress) + if got, want := used, []int{0}; !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + + // Start two more proxies and wait for the server to notice them. + second := startProxy(t, sh) + third := startProxy(t, sh) + + _, err = waitForNMountedServers(t, sh.Ctx, ns, serverNameAll, 3) + assert("second and third proxies and server log", second.log, third.log, serverLog) + + _, proxyStatsAddresses, err := waitForNProxies(t, sh.Ctx, ns, 3) + assert("wait for all three proxies to be in the mounttable") + + // Run the client. + err = runSingleClient(sh, runClientAllProxiesServer, clientCreds) + assert("client with two proxies again") + + requests, _, _, err = gatherStats(ctx, proxyStatsAddresses, serverAddress) + assert("gatherStats") + + used = proxiesUsedForServer(requests, serverAddress) + if got, want := len(used), 3; !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + +} + func TestV23SingleProxyResilience(t *testing.T) { v23test.SkipUnlessRunningIntegrationTests(t) sh := v23test.NewShell(t, nil) @@ -701,11 +770,17 @@ var runServerRandomProxy = gosh.RegisterFunc( "runServerRandomProxy", createProxiedServer(serverNameRandom, proxyName, rpc.UseRandomProxy, 0), ) -var runServerAllProxies = gosh.RegisterFunc( - "runServerAllProxies", + +var runServerAllProxiesLimit2 = gosh.RegisterFunc( + "runServerAllProxiesLimit2", createProxiedServer(serverNameAll, proxyName, rpc.UseAllProxies, using2OfMProxies), ) +var runServerAllProxiesNoLimit = gosh.RegisterFunc( + "runServerAllProxiesNoLimit", + createProxiedServer(serverNameAll, proxyName, rpc.UseAllProxies, 0), +) + func createClient(serverName string, iterations int) func() error { return func() error { ctx, shutdown := test.V23Init() From 4b558a3617f59f04a425aff3cf62f6f327162f8a Mon Sep 17 00:00:00 2001 From: Cosmos Nicolaou Date: Tue, 9 Feb 2021 14:04:53 -0800 Subject: [PATCH 2/3] . --- x/ref/runtime/internal/rpc/test/client_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/x/ref/runtime/internal/rpc/test/client_test.go b/x/ref/runtime/internal/rpc/test/client_test.go index 4330cbf95..bc1e28a6c 100644 --- a/x/ref/runtime/internal/rpc/test/client_test.go +++ b/x/ref/runtime/internal/rpc/test/client_test.go @@ -295,8 +295,15 @@ func TestStartCallErrors(t *testing.T) { //nolint:gocyclo if !errors.Is(err, verror.ErrNoServers) { t.Errorf("wrong error: %s", err) } - if want := "connection refused"; !strings.Contains(verror.DebugString(err), want) { - t.Errorf("wrong error: %s - doesn't contain %q", err, want) + found := false + allowed := []string{"connection reset by peer", "connection refused"} + for _, want := range allowed { + if strings.Contains(verror.DebugString(err), want) { + found = true + } + } + if !found { + t.Errorf("wrong error: %s - doesn't contain one of %q", err, allowed) } // This will fail with NoServers, but because there really is no From f34e337827eab3f48d500bcba922d7d5294240b4 Mon Sep 17 00:00:00 2001 From: Cosmos Nicolaou Date: Tue, 9 Feb 2021 14:17:15 -0800 Subject: [PATCH 3/3] lint --- x/ref/runtime/internal/rpc/proxymgr.go | 28 ++++++++++++++------------ 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/x/ref/runtime/internal/rpc/proxymgr.go b/x/ref/runtime/internal/rpc/proxymgr.go index 4e97c1f94..ccc31f358 100644 --- a/x/ref/runtime/internal/rpc/proxymgr.go +++ b/x/ref/runtime/internal/rpc/proxymgr.go @@ -244,6 +244,20 @@ func drainNotifyChan(ch chan struct{}) { } } +func (pm *proxyManager) watchForChanges(ctx *context.T, ch chan struct{}) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(pm.resolveDelay): + pm.updateAvailableProxies(ctx) + if pm.shouldGrow() && pm.canGrow() { + sendNotify(ch) + } + } + } +} + func (pm *proxyManager) manageProxyConnections(ctx *context.T) { notifyCh := make(chan struct{}, 10) pm.updateAvailableProxies(ctx) @@ -251,19 +265,7 @@ func (pm *proxyManager) manageProxyConnections(ctx *context.T) { // 'all' policy, the server will connect to new proxies as they appear. // For other policies reconnection may be little faster since the // new set of proxies is already available. - go func() { - for { - select { - case <-ctx.Done(): - return - case <-time.After(pm.resolveDelay): - } - pm.updateAvailableProxies(ctx) - if pm.shouldGrow() && pm.canGrow() { - sendNotify(notifyCh) - } - } - }() + go pm.watchForChanges(ctx, notifyCh) for { select {