Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type runOpts struct {

// Run downloads Envoy and runs it as a process with the arguments
// passed to it. Use RunOption for configuration options.
//
// This blocks until the context is done or the process exits. The error might be
// context.Canceled if the context is done or an error from the process.
func Run(ctx context.Context, args []string, options ...RunOption) error {
ro := &runOpts{
homeDir: globals.DefaultHomeDir,
Expand All @@ -92,20 +95,7 @@ func Run(ctx context.Context, args []string, options ...RunOption) error {
}

funcECmd := cmd.NewApp(&o)

funcERunArgs := []string{"func-e", "--platform", runtime.GOOS + "/" + runtime.GOARCH, "run"}
funcERunArgs = append(funcERunArgs, args...)

errChan := make(chan error)
go func() {
errChan <- funcECmd.RunContext(ctx, funcERunArgs)
}()

// Wait for run to exit or an explicit stop.
select {
case <-ctx.Done():
return nil
case err := <-errChan:
return err
}
return funcECmd.RunContext(ctx, funcERunArgs) // This will block until the context is done or the process exits.
}
30 changes: 17 additions & 13 deletions api/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"os"
"path/filepath"
"strconv"
"testing"
"time"

Expand All @@ -33,26 +34,29 @@ var (
)

func TestRunWithCtxDone(t *testing.T) {

tmpDir := t.TempDir()
envoyVersion := version.LastKnownEnvoy
versionsServer := test.RequireEnvoyVersionsTestServer(t, envoyVersion)
defer versionsServer.Close()
envoyVersionsURL := versionsServer.URL + "/envoy-versions.json"
b := bytes.NewBufferString("")

require.Equal(t, 0, b.Len())

ctx := context.Background()
// Use a very small ctx timeout
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
err := Run(ctx, runArgs, Out(b), HomeDir(tmpDir), EnvoyVersionsURL(envoyVersionsURL))
require.NoError(t, err)

require.NotEqual(t, 0, b.Len())
_, err = os.Stat(filepath.Join(tmpDir, "versions"))
err := Run(t.Context(), runArgs, HomeDir(tmpDir), EnvoyVersionsURL(envoyVersionsURL))
require.NoError(t, err)
// Run the same test multiple times to ensure that the Envoy process is cleaned up properly with the context cancellation.
for i := range 10 {
t.Run(strconv.Itoa(i), func(t *testing.T) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// This will return right after the context is done.
err := Run(ctx, []string{
"--log-level", "info",
"--config-yaml", "admin: {address: {socket_address: {address: '127.0.0.1', port_value: 9901}}}",
}, HomeDir(tmpDir), EnvoyVersionsURL(envoyVersionsURL))
require.Greater(t, time.Since(start).Seconds(), 2.0)
require.NoError(t, err) // If the address is already in use, the exit code will be 1.
})
}
}

func TestRunToCompletion(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion internal/cmd/run_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -60,7 +61,7 @@ func TestFuncERun(t *testing.T) {
// tee the error stream so we can look for the "starting main dispatch loop" line without consuming it.
errCopy := new(bytes.Buffer)
c.ErrWriter = io.MultiWriter(stderr, errCopy)
err := test.RequireRun(t, nil, &runner{c, stdout, stderr}, errCopy, args...)
err := test.RequireRun(t, 3*time.Second, &runner{c, stdout, stderr}, errCopy, args...)

require.NoError(t, err)
require.Empty(t, stdout)
Expand Down
47 changes: 19 additions & 28 deletions internal/envoy/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (
"fmt"
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/tetratelabs/func-e/internal/moreos"
)

// Run execs the binary at the path with the args passed. It is a blocking function that can be shutdown via SIGINT.
// Run execs the binary at the path with the args passed. It is a blocking function that can be shutdown via ctx.
//
// This will exit either `ctx` is done, or the process exits.
func (r *Runtime) Run(ctx context.Context, args []string) error {
// We can't use CommandContext even if that seems correct here. The reason is that we need to invoke shutdown hooks,
// and they expect the process to still be running. For example, this allows admin API hooks.
Expand All @@ -51,37 +51,28 @@ func (r *Runtime) Run(ctx context.Context, args []string) error {
// Warn, but don't fail if we can't write the pid file for some reason
r.maybeWarn(os.WriteFile(r.pidPath, []byte(strconv.Itoa(cmd.Process.Pid)), 0o600))

waitCtx, waitCancel := context.WithCancel(ctx)
defer waitCancel()

sigCtx, sigCancel := signal.NotifyContext(waitCtx, syscall.SIGINT, syscall.SIGTERM)
defer sigCancel()
r.FakeInterrupt = sigCancel

// Wait in a goroutine. We may need to kill the process if a signal occurs first.
//
// Note: do not wrap the original context, otherwise "<-cmdExitWait.Done()" won't block until the process exits
// if the original context is done.
cmdExitWait, cmdExit := context.WithCancel(context.Background())
defer cmdExit()
go func() {
defer waitCancel()
_ = r.cmd.Wait() // Envoy logs like "caught SIGINT" or "caught ENVOY_SIGTERM", so we don't repeat logging here.
defer cmdExit()
_ = r.cmd.Wait()
}()

awaitAdminAddress(sigCtx, r)

// Block until we receive SIGINT or are canceled because Envoy has died
<-sigCtx.Done()

// The process could have exited due to incorrect arguments or otherwise.
// If it is still running, run shutdown hooks and propagate the interrupt.
if cmd.ProcessState == nil {
r.handleShutdown(ctx)
}
awaitAdminAddress(ctx, r)

// At this point, shutdown hooks have run and Envoy is interrupted.
// Block until it exits to ensure file descriptors are closed prior to archival.
// Allow up to 5 seconds for a clean stop, killing if it can't for any reason.
// Block until the process exits or the original context is done.
select {
case <-waitCtx.Done(): // cmd.Wait goroutine finished
case <-time.After(5 * time.Second):
_ = moreos.EnsureProcessDone(r.cmd.Process)
case <-ctx.Done():
// When original context is done, we need to shutdown the process by ourselves.
// Run the shutdown hooks and wait for them to complete.
r.handleShutdown()
// Then wait for the process to exit.
<-cmdExitWait.Done()
case <-cmdExitWait.Done():
}

// Warn, but don't fail on error archiving the run directory
Expand Down
29 changes: 8 additions & 21 deletions internal/envoy/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ func TestRuntime_Run(t *testing.T) {
tests := []struct {
name string
args []string
shutdown func()
shutdown bool
timeout time.Duration
expectedStdout, expectedStderr string
expectedErr string
wantShutdownHook bool
}{
{
name: "func-e Ctrl+C",
args: []string{"-c", "envoy.yaml"},
name: "func-e Ctrl+C",
args: []string{"-c", "envoy.yaml"},
timeout: time.Second,
// Don't warn the user when they exited the process
expectedStdout: moreos.Sprintf("starting: %s -c envoy.yaml %s\n", fakeEnvoy, adminFlag),
expectedStderr: moreos.Sprintf("initializing epoch 0\nstarting main dispatch loop\ncaught SIGINT\nexiting\n"),
Expand All @@ -64,7 +66,6 @@ func TestRuntime_Run(t *testing.T) {
// Envoy returns exit status zero on anything except kill -9. We can't test kill -9 with a fake shell script.
{
name: "Envoy exited with error",
shutdown: func() { time.Sleep(time.Millisecond * 100) },
args: []string{}, // no config file!
expectedStdout: moreos.Sprintf("starting: %s %s\n", fakeEnvoy, adminFlag),
expectedStderr: moreos.Sprintf("initializing epoch 0\nexiting\nAt least one of --config-path or --config-yaml or Options::configProto() should be non-empty\n"),
Expand Down Expand Up @@ -102,15 +103,10 @@ func TestRuntime_Run(t *testing.T) {
return nil
})

shutdown := tc.shutdown
if shutdown == nil {
shutdown = ctrlC(r)
}

// tee the error stream so we can look for the "starting main dispatch loop" line without consuming it.
errCopy := new(bytes.Buffer)
r.Err = io.MultiWriter(r.Err, errCopy)
err := test.RequireRun(t, shutdown, r, errCopy, tc.args...)
err := test.RequireRun(t, tc.timeout, r, errCopy, tc.args...)

if tc.expectedErr == "" {
require.NoError(t, err)
Expand All @@ -128,8 +124,8 @@ func TestRuntime_Run(t *testing.T) {
require.Equal(t, tc.wantShutdownHook, haveShutdownHook)

// Validate we ran what we thought we did
require.Equal(t, tc.expectedStdout, stdout.String())
require.Equal(t, tc.expectedStderr, stderr.String())
require.Contains(t, stdout.String(), tc.expectedStdout)
require.Contains(t, stderr.String(), tc.expectedStderr)

// Ensure the working directory was deleted, and the "run" directory only contains the archive
files, err := os.ReadDir(runsDir)
Expand All @@ -144,15 +140,6 @@ func TestRuntime_Run(t *testing.T) {
}
}

func ctrlC(r *Runtime) func() {
return func() {
fakeInterrupt := r.FakeInterrupt
if fakeInterrupt != nil {
fakeInterrupt()
}
}
}

func requireEnvoyPid(t *testing.T, r *Runtime) int {
if r.cmd == nil || r.cmd.Process == nil {
t.Fatal("envoy process not yet started")
Expand Down
4 changes: 0 additions & 4 deletions internal/envoy/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ type Runtime struct {

adminAddress, adminAddressPath, pidPath string

// FakeInterrupt is exposed for unit tests to pretend "func-e run" received a Ctrl+C or Ctrl+Break.
// End-to-end tests should kill the func-e process to achieve the same.
FakeInterrupt context.CancelFunc

shutdownHooks []func(context.Context) error
}

Expand Down
4 changes: 2 additions & 2 deletions internal/envoy/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func (r *Runtime) RegisterShutdownHook(f func(context.Context) error) {
r.shutdownHooks = append(r.shutdownHooks, f)
}

func (r *Runtime) handleShutdown(ctx context.Context) {
func (r *Runtime) handleShutdown() {
defer r.interruptEnvoy() // Ensure the SIGINT forwards to Envoy even if a shutdown hook panics

deadline := time.Now().Add(shutdownTimeout)
timeout, cancel := context.WithDeadline(ctx, deadline)
timeout, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

moreos.Fprintf(r.Out, "invoking shutdown hooks with deadline %s\n", deadline.Format(dateFormat))
Expand Down
3 changes: 3 additions & 0 deletions internal/envoy/shutdown/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -65,6 +66,8 @@ func (e *envoyAdminDataCollection) retrieveAdminAPIData(ctx context.Context) err
file := filepath.Join(e.workingDir, f)

g.Go(func() error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return copyURLToFile(ctx, url, file)
})
}
Expand Down
8 changes: 2 additions & 6 deletions internal/envoy/shutdown/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -56,10 +57,5 @@ func runWithShutdownHook(t *testing.T, runDir string, hook func(r *envoy.Runtime
r.Err = stderr
require.NoError(t, hook(r))

return test.RequireRun(t, func() {
fakeInterrupt := r.FakeInterrupt
if fakeInterrupt != nil {
fakeInterrupt()
}
}, r, stderr, "-c", "envoy.yaml")
return test.RequireRun(t, 10*time.Second, r, stderr, "-c", "envoy.yaml")
}
17 changes: 8 additions & 9 deletions internal/test/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ type Runner interface {
}

// RequireRun executes Run on the given Runtime and calls shutdown after it started.
func RequireRun(t *testing.T, shutdown func(), r Runner, stderr io.Reader, args ...string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// If there's no shutdown function, shutdown via cancellation. This is similar to ctrl-c
if shutdown == nil {
shutdown = cancel
func RequireRun(t *testing.T, timeout time.Duration, r Runner, stderr io.Reader, args ...string) error {
var ctx context.Context
if timeout == 0 {
ctx = context.Background()
} else {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
}

// Run in a goroutine, and signal when that completes
Expand All @@ -64,8 +65,6 @@ func RequireRun(t *testing.T, shutdown func(), r Runner, stderr io.Reader, args
}
}

// Even if we had an error, we invoke the shutdown at this point to avoid leaking a process
shutdown()
<-ran // block until the runner finished
return err
}
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package main

import (
"context"
"fmt"
"io"
"os"
"os/signal"
"syscall"

"github.com/urfave/cli/v2"

Expand Down Expand Up @@ -50,7 +53,9 @@ func run(stdout, stderr io.Writer, args []string) int {
app.OnUsageError = func(c *cli.Context, err error, isSub bool) error {
return cmdutil.NewValidationError(err.Error())
}
if err := app.Run(args); err != nil {
sigCtx, sigCancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer sigCancel()
if err := app.RunContext(sigCtx, args); err != nil {
if _, ok := err.(*cmdutil.ValidationError); ok {
moreos.Fprintf(stderr, "%s\n", err)
logUsageError(app.Name, stderr)
Expand Down