From dfe8fe48ba02b5c9bf50720c124b928ccf812262 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 11 Jan 2022 19:15:21 +0000 Subject: [PATCH] [BEAM-13399] Move service liveness polling to Runner type --- .../core/runtime/xlangx/expansionx/process.go | 27 ++++++++--- .../runtime/xlangx/expansionx/process_test.go | 46 ++++++++++++++++++- .../test/integration/xlang/expansion_test.go | 11 ----- 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go index 4b034eb32b9f..8428b92edcab 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go @@ -16,10 +16,13 @@ package expansionx import ( + "context" "fmt" "net" "os/exec" "time" + + "google.golang.org/grpc" ) // ExpansionServiceRunner is a type that holds information required to @@ -60,11 +63,24 @@ func (e *ExpansionServiceRunner) String() string { } // Endpoint returns the formatted endpoint the ExpansionServiceRunner is set to start the expansion -// service on. +// service on. func (e *ExpansionServiceRunner) Endpoint() string { return "localhost:" + e.servicePort } +func (e *ExpansionServiceRunner) pingEndpoint(timeout time.Duration) error { + ctx, canFunc := context.WithTimeout(context.Background(), timeout) + defer canFunc() + conn, err := grpc.DialContext(ctx, e.Endpoint(), grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + return err + } + conn.Close() + return nil +} + +const connectionTimeout = 15 * time.Second + // StartService starts the expansion service for a given ExpansionServiceRunner. If this is // called and does not return an error, the expansion service will be running in the background // until StopService is called. This will leak resources if not addressed. @@ -73,11 +89,10 @@ func (e *ExpansionServiceRunner) StartService() error { if err != nil { return err } - // Start() is non-blocking so a brief sleep to let the JAR start up and begin accepting - // connections is necessary. - time.Sleep(2 * time.Second) - if e.serviceCommand.ProcessState != nil { - return fmt.Errorf("process %v exited when it should still be running", e.serviceCommand.Process) + + err = e.pingEndpoint(connectionTimeout) + if err != nil { + return err } return nil } diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go index 81ca7db8af08..1410d444c5c5 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go @@ -16,10 +16,14 @@ package expansionx import ( + "net" "os" "os/exec" "strings" "testing" + "time" + + "google.golang.org/grpc" ) func TestFindOpenPort(t *testing.T) { @@ -69,6 +73,44 @@ func TestEndpoint(t *testing.T) { } } +func makeAndArrangeMockGRPCService(t *testing.T, runner *ExpansionServiceRunner) error { + server := grpc.NewServer() + lis, err := net.Listen("tcp", runner.Endpoint()) + if err != nil { + return err + } + + go server.Serve(lis) + t.Cleanup(func() { server.Stop(); lis.Close() }) + return nil +} + +func TestPingEndpoint_bad(t *testing.T) { + serviceRunner, err := NewExpansionServiceRunner("", "") + if err != nil { + t.Fatalf("NewExpansionServiceRunner failed, got %v", err) + } + err = serviceRunner.pingEndpoint(1 * time.Second) + if err == nil { + t.Errorf("pingEndpoint succeeded when it should have failed") + } +} + +func TestPingEndpoint_good(t *testing.T) { + serviceRunner, err := NewExpansionServiceRunner("", "") + if err != nil { + t.Fatalf("NewExpansionServiceRunner failed, got %v", err) + } + err = makeAndArrangeMockGRPCService(t, serviceRunner) + if err != nil { + t.Fatalf("starting GRPC service failed, got %v", err) + } + err = serviceRunner.pingEndpoint(10 * time.Second) + if err != nil { + t.Errorf("pingEndpoint() failed, got %v", err) + } +} + func TestStartService_badCommand(t *testing.T) { serviceRunner, err := NewExpansionServiceRunner("", "") if err != nil { @@ -86,7 +128,9 @@ func TestStartService_good(t *testing.T) { if err != nil { t.Fatalf("NewExpansionServiceRunner failed, got %v", err) } - serviceRunner.serviceCommand = exec.Command("which", "go") + makeAndArrangeMockGRPCService(t, serviceRunner) + // Drop in a command that shouldn't error on its own + serviceRunner.serviceCommand = exec.Command("echo", "testing") err = serviceRunner.StartService() if err != nil { t.Errorf("StartService failed when it should have succeeded, got %v", err) diff --git a/sdks/go/test/integration/xlang/expansion_test.go b/sdks/go/test/integration/xlang/expansion_test.go index 2a9d2075beda..113e1c537844 100644 --- a/sdks/go/test/integration/xlang/expansion_test.go +++ b/sdks/go/test/integration/xlang/expansion_test.go @@ -16,14 +16,11 @@ package xlang import ( - "context" "os" "testing" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" "github.com/apache/beam/sdks/v2/go/test/integration" - "google.golang.org/grpc" ) const ( @@ -50,14 +47,6 @@ func TestAutomatedExpansionService(t *testing.T) { t.Errorf("failed to start expansion service JAR, got %v", err) } - ctx, canFunc := context.WithTimeout(context.Background(), 15*time.Second) - t.Cleanup(func() { canFunc() }) - conn, err := grpc.DialContext(ctx, serviceRunner.Endpoint(), grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - t.Fatalf("could not connect to endpoint %v, got %v", serviceRunner.Endpoint(), err) - } - conn.Close() - err = serviceRunner.StopService() if err != nil { t.Errorf("failed to stop expansion service JAR, got %v", err)