From 82f585374dd0acde9b8ff839f0e48cd93f83f359 Mon Sep 17 00:00:00 2001 From: Hamza El-Saawy Date: Wed, 2 Feb 2022 14:39:01 -0500 Subject: [PATCH 1/3] Adding proper shim shutdown Currently, Shutdown requests forcefully exits the binary without cleaning up resources and IO channels, or flushing logs. Added `.Done()` and `.IsShutdown()` methods to service watch for service shutdown requests from containerd, and appropriately close background servers and go routines. Added `NewService` method and creation options to properly initialize the `service` struct. Signed-off-by: Hamza El-Saawy --- cmd/containerd-shim-runhcs-v1/serve.go | 39 +++++++---- cmd/containerd-shim-runhcs-v1/service.go | 64 ++++++++++++++++++- .../service_internal.go | 13 ++-- .../service_internal_podshim_test.go | 51 +++++++++++++-- .../service_internal_taskshim_test.go | 52 +++++++++++++-- .../service_internal_test.go | 41 +++++++++++- 6 files changed, 230 insertions(+), 30 deletions(-) diff --git a/cmd/containerd-shim-runhcs-v1/serve.go b/cmd/containerd-shim-runhcs-v1/serve.go index ecef00c2e7..efc32fd9cd 100644 --- a/cmd/containerd-shim-runhcs-v1/serve.go +++ b/cmd/containerd-shim-runhcs-v1/serve.go @@ -169,11 +169,9 @@ var serveCommand = cli.Command{ ttrpcAddress := os.Getenv(ttrpcAddressEnv) ttrpcEventPublisher, err := newEventPublisher(ttrpcAddress) - if err != nil { return err } - defer func() { if err != nil { ttrpcEventPublisher.close() @@ -181,11 +179,13 @@ var serveCommand = cli.Command{ }() // Setup the ttrpc server - svc = &service{ - events: ttrpcEventPublisher, - tid: idFlag, - isSandbox: ctx.Bool("is-sandbox"), + svc, err = NewService(WithEventPublisher(ttrpcEventPublisher), + WithTID(idFlag), + WithIsSandbox(ctx.Bool("is-sandbox"))) + if err != nil { + return fmt.Errorf("starting service: %w", err) } + s, err := ttrpc.NewServer(ttrpc.WithUnaryServerInterceptor(octtrpc.ServerInterceptor())) if err != nil { return err @@ -204,10 +204,10 @@ var serveCommand = cli.Command{ serrs := make(chan error, 1) defer close(serrs) go func() { - // TODO: JTERRY75 We should use a real context with cancellation shared by - // the service for shim shutdown gracefully. - ctx := context.Background() - if err := trapClosedConnErr(s.Serve(ctx, sl)); err != nil { + // Serve loops infinitely unless s.Shutdown or s.Close are called. + // Passed in context is used as parent context for handling requests, + // but canceliing does not bring down ttrpc service. + if err := trapClosedConnErr(s.Serve(context.Background(), sl)); err != nil { logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure") serrs <- err return @@ -221,7 +221,7 @@ var serveCommand = cli.Command{ case err := <-serrs: return err case <-time.After(2 * time.Millisecond): - // TODO: JTERRY75 this is terrible code. Contribue a change to + // TODO: JTERRY75 this is terrible code. Contribute a change to // ttrpc that you can: // // go func () { errs <- s.Serve() } @@ -232,12 +232,25 @@ var serveCommand = cli.Command{ // This is our best indication that we have not errored on creation // and are successfully serving the API. + // Closing stdout signals to containerd that shim started successfully os.Stdout.Close() } // Wait for the serve API to be shut down. - <-serrs - return nil + select { + case err = <-serrs: + // the ttrpc server shutdown without processing a shutdown request + case <-svc.Done(): + if !svc.GracefulShutdown { + // Return immediately, but still close ttrpc server, pipes, and spans + // Shouldn't need to os.Exit without clean up (ie, deferred `.Close()`s) + return nil + } + // Drain any remaining active ttrpc requests; times out after 200 ms + err = s.Shutdown(context.Background()) + } + + return err }, } diff --git a/cmd/containerd-shim-runhcs-v1/service.go b/cmd/containerd-shim-runhcs-v1/service.go index 0085168580..afdf82fc5e 100644 --- a/cmd/containerd-shim-runhcs-v1/service.go +++ b/cmd/containerd-shim-runhcs-v1/service.go @@ -18,7 +18,29 @@ import ( "go.opencensus.io/trace" ) -var _ = (task.TaskService)(&service{}) +type ServiceOptions struct { + Events publisher + TID string + IsSandbox bool +} + +type ServiceOption func(*ServiceOptions) + +func WithEventPublisher(e publisher) ServiceOption { + return func(o *ServiceOptions) { + o.Events = e + } +} +func WithTID(tid string) ServiceOption { + return func(o *ServiceOptions) { + o.TID = tid + } +} +func WithIsSandbox(s bool) ServiceOption { + return func(o *ServiceOptions) { + o.IsSandbox = s + } +} type service struct { events publisher @@ -42,10 +64,35 @@ type service struct { taskOrPod atomic.Value // cl is the create lock. Since each shim MUST only track a single task or - // POD. `cl` is used to create the task or POD sandbox. It SHOULD not be + // POD. `cl` is used to create the task or POD sandbox. It SHOULD NOT be // taken when creating tasks in a POD sandbox as they can happen // concurrently. cl sync.Mutex + + // shut is closed to signal a shut request is received + shut chan struct{} + // shutOnce is responsible for closign `shut` and any other necessary cleanup + shutOnce sync.Once + // GracefulShutdown dictates whether to shutdown gracefully and clean up resources + // or exit immediately + GracefulShutdown bool +} + +var _ = (task.TaskService)(&service{}) + +func NewService(o ...ServiceOption) (svc *service, err error) { + var opts ServiceOptions + for _, op := range o { + op(&opts) + } + + svc = &service{ + events: opts.Events, + tid: opts.TID, + isSandbox: opts.IsSandbox, + shut: make(chan struct{}), + } + return svc, nil } func (s *service) State(ctx context.Context, req *task.StateRequest) (resp *task.StateResponse, err error) { @@ -475,3 +522,16 @@ func (s *service) ComputeProcessorInfo(ctx context.Context, req *extendedtask.Co r, e := s.computeProcessorInfoInternal(ctx, req) return r, errdefs.ToGRPC(e) } + +func (s *service) Done() <-chan struct{} { + return s.shut +} + +func (s *service) IsShutdown() bool { + select { + case <-s.shut: + return true + default: + return false + } +} diff --git a/cmd/containerd-shim-runhcs-v1/service_internal.go b/cmd/containerd-shim-runhcs-v1/service_internal.go index 739531d3a1..ba98f63b61 100644 --- a/cmd/containerd-shim-runhcs-v1/service_internal.go +++ b/cmd/containerd-shim-runhcs-v1/service_internal.go @@ -447,12 +447,13 @@ func (s *service) shutdownInternal(ctx context.Context, req *task.ShutdownReques return empty, nil } - if req.Now { - os.Exit(0) - } - // TODO: JTERRY75 if we dont use `now` issue a Shutdown to the ttrpc - // connection to drain any active requests. - os.Exit(0) + s.shutOnce.Do(func() { + // TODO: should taskOrPod be deleted/set to nil? + // TODO: is there any extra leftovers of the shimTask/Pod to clean? ie: verify all handles are closed? + s.GracefulShutdown = !req.Now + close(s.shut) + }) + return empty, nil } diff --git a/cmd/containerd-shim-runhcs-v1/service_internal_podshim_test.go b/cmd/containerd-shim-runhcs-v1/service_internal_podshim_test.go index dc316e6ea3..5ea0955833 100644 --- a/cmd/containerd-shim-runhcs-v1/service_internal_podshim_test.go +++ b/cmd/containerd-shim-runhcs-v1/service_internal_podshim_test.go @@ -2,9 +2,11 @@ package main import ( "context" + "fmt" "math/rand" "strconv" "testing" + "time" "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options" "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/stats" @@ -16,11 +18,20 @@ import ( func setupPodServiceWithFakes(t *testing.T) (*service, *testShimTask, *testShimTask, *testShimExec) { tid := strconv.Itoa(rand.Int()) - s := service{ - tid: tid, - isSandbox: true, + + s, err := NewService(WithTID(tid), WithIsSandbox(true)) + if err != nil { + t.Fatalf("could not create service: %v", err) } + // clean up the service + t.Cleanup(func() { + s.shutdownInternal(context.Background(), &task.ShutdownRequest{ + ID: s.tid, + Now: true, + }) + }) + pod := &testShimPod{id: tid} // create init fake container @@ -45,7 +56,7 @@ func setupPodServiceWithFakes(t *testing.T) (*service, *testShimTask, *testShimT pod.tasks.Store(task.id, task) pod.tasks.Store(task2.id, task2) s.taskOrPod.Store(pod) - return &s, task, task2, task2exec2 + return s, task, task2, task2exec2 } func Test_PodShim_getPod_NotCreated_Error(t *testing.T) { @@ -723,3 +734,35 @@ func Test_PodShim_statsInternal_2ndTaskID_Success(t *testing.T) { }) } } + +func Test_PodShim_shutdownInternal(t *testing.T) { + for _, now := range []bool{true, false} { + t.Run(fmt.Sprintf("%s_Now_%t", t.Name(), now), func(t *testing.T) { + s, _, _, _ := setupPodServiceWithFakes(t) + + if s.IsShutdown() { + t.Fatal("service prematurely shutdown") + } + + _, err := s.shutdownInternal(context.Background(), &task.ShutdownRequest{ + ID: s.tid, + Now: now, + }) + if err != nil { + t.Fatalf("could not shut down service: %v", err) + } + + tm := time.NewTimer(5 * time.Millisecond) + select { + case <-tm.C: + t.Fatalf("shutdown channel did not close") + case <-s.Done(): + tm.Stop() + } + + if !s.IsShutdown() { + t.Fatal("service did not shutdown") + } + }) + } +} diff --git a/cmd/containerd-shim-runhcs-v1/service_internal_taskshim_test.go b/cmd/containerd-shim-runhcs-v1/service_internal_taskshim_test.go index 1ee8eb7157..e26f2e8760 100644 --- a/cmd/containerd-shim-runhcs-v1/service_internal_taskshim_test.go +++ b/cmd/containerd-shim-runhcs-v1/service_internal_taskshim_test.go @@ -2,9 +2,11 @@ package main import ( "context" + "fmt" "math/rand" "strconv" "testing" + "time" "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options" "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/stats" @@ -16,10 +18,20 @@ import ( func setupTaskServiceWithFakes(t *testing.T) (*service, *testShimTask, *testShimExec) { tid := strconv.Itoa(rand.Int()) - s := service{ - tid: tid, - isSandbox: false, + + s, err := NewService(WithTID(tid), WithIsSandbox(false)) + if err != nil { + t.Fatalf("could not create service: %v", err) } + + // clean up the service + t.Cleanup(func() { + s.shutdownInternal(context.Background(), &task.ShutdownRequest{ + ID: s.tid, + Now: true, + }) + }) + task := &testShimTask{ id: tid, exec: newTestShimExec(tid, tid, 10), @@ -29,7 +41,7 @@ func setupTaskServiceWithFakes(t *testing.T) (*service, *testShimTask, *testShim secondExec := newTestShimExec(tid, secondExecID, 101) task.execs[secondExecID] = secondExec s.taskOrPod.Store(task) - return &s, task, secondExec + return s, task, secondExec } func Test_TaskShim_getTask_NotCreated_Error(t *testing.T) { @@ -619,3 +631,35 @@ func Test_TaskShim_statsInternal_InitTaskID_Sucess(t *testing.T) { }) } } + +func Test_TaskShim_shutdownInternal(t *testing.T) { + for _, now := range []bool{true, false} { + t.Run(fmt.Sprintf("%s_Now_%t", t.Name(), now), func(t *testing.T) { + s, _, _ := setupTaskServiceWithFakes(t) + + if s.IsShutdown() { + t.Fatal("service prematurely shutdown") + } + + _, err := s.shutdownInternal(context.Background(), &task.ShutdownRequest{ + ID: s.tid, + Now: now, + }) + if err != nil { + t.Fatalf("could not shut down service: %v", err) + } + + tm := time.NewTimer(5 * time.Millisecond) + select { + case <-tm.C: + t.Fatalf("shutdown channel did not close") + case <-s.Done(): + tm.Stop() + } + + if !s.IsShutdown() { + t.Fatal("service did not shutdown") + } + }) + } +} diff --git a/cmd/containerd-shim-runhcs-v1/service_internal_test.go b/cmd/containerd-shim-runhcs-v1/service_internal_test.go index a6a0075e1f..2d0e483963 100644 --- a/cmd/containerd-shim-runhcs-v1/service_internal_test.go +++ b/cmd/containerd-shim-runhcs-v1/service_internal_test.go @@ -1,16 +1,20 @@ package main import ( + "context" + "fmt" "reflect" "testing" + "time" "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/stats" v1 "github.com/containerd/cgroups/stats/v1" + "github.com/containerd/containerd/runtime/v2/task" "github.com/pkg/errors" ) func verifyExpectedError(t *testing.T, resp interface{}, actual, expected error) { - if actual == nil || errors.Cause(actual) != expected { + if actual == nil || errors.Cause(actual) != expected || !errors.Is(actual, expected) { t.Fatalf("expected error: %v, got: %v", expected, actual) } @@ -126,3 +130,38 @@ func verifyExpectedVirtualMachineStatistics(t *testing.T, v *stats.VirtualMachin t.Fatalf("expected VirtualMachineStatistics.Memory.WorkingSetBytes == 100, got: %d", v.Memory.WorkingSetBytes) } } + +func Test_Service_shutdownInternal(t *testing.T) { + for _, now := range []bool{true, false} { + t.Run(fmt.Sprintf("%s_Now_%t", t.Name(), now), func(t *testing.T) { + s, err := NewService(WithTID(t.Name())) + if err != nil { + t.Fatal(err) + } + + if s.IsShutdown() { + t.Fatal("service prematurely shutdown") + } + + _, err = s.shutdownInternal(context.Background(), &task.ShutdownRequest{ + ID: s.tid, + Now: now, + }) + if err != nil { + t.Fatalf("could not shut down service: %v", err) + } + + tm := time.NewTimer(5 * time.Millisecond) + select { + case <-tm.C: + t.Fatalf("shutdown channel did not close") + case <-s.Done(): + tm.Stop() + } + + if !s.IsShutdown() { + t.Fatal("service did not shutdown") + } + }) + } +} From 707bd112c9197f5e29d79af74a5ba43dbddd876d Mon Sep 17 00:00:00 2001 From: Hamza El-Saawy Date: Wed, 2 Feb 2022 19:11:51 -0500 Subject: [PATCH 2/3] PR: ttrpc shutdown timeout ttrp.Shutdown( has a 200ms ticker, not a timeout. Adding a proper timeout in case shutdown takes too long. Signed-off-by: Hamza El-Saawy --- cmd/containerd-shim-runhcs-v1/main.go | 4 ++++ cmd/containerd-shim-runhcs-v1/serve.go | 9 +++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/cmd/containerd-shim-runhcs-v1/main.go b/cmd/containerd-shim-runhcs-v1/main.go index e0ee3a4ac0..375b2dcb46 100644 --- a/cmd/containerd-shim-runhcs-v1/main.go +++ b/cmd/containerd-shim-runhcs-v1/main.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/Microsoft/go-winio/pkg/etw" "github.com/Microsoft/go-winio/pkg/etwlogrus" @@ -41,6 +42,9 @@ var ( containerdBinaryFlag string idFlag string + + // gracefulShutdownTimeout is how long to wait for clean-up before just exiting + gracefulShutdownTimeout = 3 * time.Second ) func etwCallback(sourceID guid.GUID, state etw.ProviderState, level etw.Level, matchAnyKeyword uint64, matchAllKeyword uint64, filterData uintptr) { diff --git a/cmd/containerd-shim-runhcs-v1/serve.go b/cmd/containerd-shim-runhcs-v1/serve.go index efc32fd9cd..468c47673e 100644 --- a/cmd/containerd-shim-runhcs-v1/serve.go +++ b/cmd/containerd-shim-runhcs-v1/serve.go @@ -221,8 +221,7 @@ var serveCommand = cli.Command{ case err := <-serrs: return err case <-time.After(2 * time.Millisecond): - // TODO: JTERRY75 this is terrible code. Contribute a change to - // ttrpc that you can: + // TODO: Contribute a change to ttrpc so that you can: // // go func () { errs <- s.Serve() } // select { @@ -246,8 +245,10 @@ var serveCommand = cli.Command{ // Shouldn't need to os.Exit without clean up (ie, deferred `.Close()`s) return nil } - // Drain any remaining active ttrpc requests; times out after 200 ms - err = s.Shutdown(context.Background()) + // currently the ttrpc shutdown is the only clean up to wait on + sctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout) + defer cancel() + err = s.Shutdown(sctx) } return err From 60d133f98161034dee5a161527168ced7b6fff9c Mon Sep 17 00:00:00 2001 From: Hamza El-Saawy Date: Thu, 3 Feb 2022 12:02:05 -0500 Subject: [PATCH 3/3] PR: error messages, naming, tests Checking return value of `shutdownInternal` for cleanup in service tests. Signed-off-by: Hamza El-Saawy --- cmd/containerd-shim-runhcs-v1/serve.go | 4 ++-- cmd/containerd-shim-runhcs-v1/service.go | 18 +++++++++--------- .../service_internal.go | 6 +++--- .../service_internal_podshim_test.go | 6 ++++-- .../service_internal_taskshim_test.go | 6 ++++-- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/cmd/containerd-shim-runhcs-v1/serve.go b/cmd/containerd-shim-runhcs-v1/serve.go index 468c47673e..6a973f1c3c 100644 --- a/cmd/containerd-shim-runhcs-v1/serve.go +++ b/cmd/containerd-shim-runhcs-v1/serve.go @@ -183,7 +183,7 @@ var serveCommand = cli.Command{ WithTID(idFlag), WithIsSandbox(ctx.Bool("is-sandbox"))) if err != nil { - return fmt.Errorf("starting service: %w", err) + return fmt.Errorf("failed to create new service: %w", err) } s, err := ttrpc.NewServer(ttrpc.WithUnaryServerInterceptor(octtrpc.ServerInterceptor())) @@ -240,7 +240,7 @@ var serveCommand = cli.Command{ case err = <-serrs: // the ttrpc server shutdown without processing a shutdown request case <-svc.Done(): - if !svc.GracefulShutdown { + if !svc.gracefulShutdown { // Return immediately, but still close ttrpc server, pipes, and spans // Shouldn't need to os.Exit without clean up (ie, deferred `.Close()`s) return nil diff --git a/cmd/containerd-shim-runhcs-v1/service.go b/cmd/containerd-shim-runhcs-v1/service.go index afdf82fc5e..c1a080eff5 100644 --- a/cmd/containerd-shim-runhcs-v1/service.go +++ b/cmd/containerd-shim-runhcs-v1/service.go @@ -69,13 +69,13 @@ type service struct { // concurrently. cl sync.Mutex - // shut is closed to signal a shut request is received - shut chan struct{} - // shutOnce is responsible for closign `shut` and any other necessary cleanup - shutOnce sync.Once - // GracefulShutdown dictates whether to shutdown gracefully and clean up resources + // shutdown is closed to signal a shutdown request is received + shutdown chan struct{} + // shutdownOnce is responsible for closing `shutdown` and any other necessary cleanup + shutdownOnce sync.Once + // gracefulShutdown dictates whether to shutdown gracefully and clean up resources // or exit immediately - GracefulShutdown bool + gracefulShutdown bool } var _ = (task.TaskService)(&service{}) @@ -90,7 +90,7 @@ func NewService(o ...ServiceOption) (svc *service, err error) { events: opts.Events, tid: opts.TID, isSandbox: opts.IsSandbox, - shut: make(chan struct{}), + shutdown: make(chan struct{}), } return svc, nil } @@ -524,12 +524,12 @@ func (s *service) ComputeProcessorInfo(ctx context.Context, req *extendedtask.Co } func (s *service) Done() <-chan struct{} { - return s.shut + return s.shutdown } func (s *service) IsShutdown() bool { select { - case <-s.shut: + case <-s.shutdown: return true default: return false diff --git a/cmd/containerd-shim-runhcs-v1/service_internal.go b/cmd/containerd-shim-runhcs-v1/service_internal.go index ba98f63b61..27c1a2cfad 100644 --- a/cmd/containerd-shim-runhcs-v1/service_internal.go +++ b/cmd/containerd-shim-runhcs-v1/service_internal.go @@ -447,11 +447,11 @@ func (s *service) shutdownInternal(ctx context.Context, req *task.ShutdownReques return empty, nil } - s.shutOnce.Do(func() { + s.shutdownOnce.Do(func() { // TODO: should taskOrPod be deleted/set to nil? // TODO: is there any extra leftovers of the shimTask/Pod to clean? ie: verify all handles are closed? - s.GracefulShutdown = !req.Now - close(s.shut) + s.gracefulShutdown = !req.Now + close(s.shutdown) }) return empty, nil diff --git a/cmd/containerd-shim-runhcs-v1/service_internal_podshim_test.go b/cmd/containerd-shim-runhcs-v1/service_internal_podshim_test.go index 5ea0955833..0d1fcc7766 100644 --- a/cmd/containerd-shim-runhcs-v1/service_internal_podshim_test.go +++ b/cmd/containerd-shim-runhcs-v1/service_internal_podshim_test.go @@ -26,10 +26,12 @@ func setupPodServiceWithFakes(t *testing.T) (*service, *testShimTask, *testShimT // clean up the service t.Cleanup(func() { - s.shutdownInternal(context.Background(), &task.ShutdownRequest{ + if _, err := s.shutdownInternal(context.Background(), &task.ShutdownRequest{ ID: s.tid, Now: true, - }) + }); err != nil { + t.Fatalf("could not shutdown service: %v", err) + } }) pod := &testShimPod{id: tid} diff --git a/cmd/containerd-shim-runhcs-v1/service_internal_taskshim_test.go b/cmd/containerd-shim-runhcs-v1/service_internal_taskshim_test.go index e26f2e8760..43fbaa66d1 100644 --- a/cmd/containerd-shim-runhcs-v1/service_internal_taskshim_test.go +++ b/cmd/containerd-shim-runhcs-v1/service_internal_taskshim_test.go @@ -26,10 +26,12 @@ func setupTaskServiceWithFakes(t *testing.T) (*service, *testShimTask, *testShim // clean up the service t.Cleanup(func() { - s.shutdownInternal(context.Background(), &task.ShutdownRequest{ + if _, err := s.shutdownInternal(context.Background(), &task.ShutdownRequest{ ID: s.tid, Now: true, - }) + }); err != nil { + t.Fatalf("could not shutdown service: %v", err) + } }) task := &testShimTask{