From fa39730f043fa823ea75c4903fc00a5ecd17edca Mon Sep 17 00:00:00 2001 From: Cosmos Nicolaou Date: Mon, 21 Jun 2021 10:10:33 -0700 Subject: [PATCH 1/3] x/ref/lib/aws/vxray: add support for obtaining eks/k8s data (#206) This PR adds support for obtaining the eke/kubernetes(k8s) cluster name, container id etc. Note, this is PR is not specific to EKS and can be used for other k8s systems. It relies on the convention that the k8s cluster is configured to write a configmap with the cluster's name as one of the key/value pairs. The vxray setup allows for both the name of the configmap and the key for the cluster name to be configured. --- x/ref/examples/echo/bin/Dockerfile.echo | 4 + x/ref/examples/echo/bin/Dockerfile.echod | 4 + .../examples/echo/bin/Dockerfile.mounttabled | 4 + x/ref/examples/echo/echo/echo.go | 50 +++++++- x/ref/examples/echo/echod/echod.go | 8 +- x/ref/lib/aws/vxray/config.go | 107 ++++++++++++++--- x/ref/lib/aws/vxray/internal/eks_info.go | 111 ++++++++++++++++++ x/ref/lib/aws/vxray/manager.go | 59 +++++++--- 8 files changed, 307 insertions(+), 40 deletions(-) create mode 100644 x/ref/examples/echo/bin/Dockerfile.echo create mode 100644 x/ref/examples/echo/bin/Dockerfile.echod create mode 100644 x/ref/examples/echo/bin/Dockerfile.mounttabled create mode 100644 x/ref/lib/aws/vxray/internal/eks_info.go diff --git a/x/ref/examples/echo/bin/Dockerfile.echo b/x/ref/examples/echo/bin/Dockerfile.echo new file mode 100644 index 000000000..f047fafd7 --- /dev/null +++ b/x/ref/examples/echo/bin/Dockerfile.echo @@ -0,0 +1,4 @@ +FROM alpine +COPY echo /bin/echo +COPY creds/ /bin/creds/ +ENTRYPOINT ["/bin/echo"] diff --git a/x/ref/examples/echo/bin/Dockerfile.echod b/x/ref/examples/echo/bin/Dockerfile.echod new file mode 100644 index 000000000..8add85c92 --- /dev/null +++ b/x/ref/examples/echo/bin/Dockerfile.echod @@ -0,0 +1,4 @@ +FROM alpine +COPY echod /bin/echod +COPY creds/ /bin/creds/ +ENTRYPOINT ["/bin/echod"] diff --git a/x/ref/examples/echo/bin/Dockerfile.mounttabled b/x/ref/examples/echo/bin/Dockerfile.mounttabled new file mode 100644 index 000000000..550171cd8 --- /dev/null +++ b/x/ref/examples/echo/bin/Dockerfile.mounttabled @@ -0,0 +1,4 @@ +FROM alpine +COPY mounttabled /bin/mounttabled +COPY creds/ /bin/creds/ +ENTRYPOINT ["/bin/mounttabled"] diff --git a/x/ref/examples/echo/echo/echo.go b/x/ref/examples/echo/echo/echo.go index 066372583..8e9a8d591 100644 --- a/x/ref/examples/echo/echo/echo.go +++ b/x/ref/examples/echo/echo/echo.go @@ -6,6 +6,7 @@ package main import ( + "errors" "flag" "fmt" "io" @@ -18,6 +19,7 @@ import ( "github.com/aws/aws-xray-sdk-go/xray" v23 "v.io/v23" "v.io/v23/context" + "v.io/v23/naming" "v.io/v23/vtrace" "v.io/x/ref/examples/echo" "v.io/x/ref/lib/aws/vxray" @@ -50,7 +52,20 @@ func main() { ctx, shutdown := v23.Init() defer shutdown() - ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true)) + ctx, _ = vxray.InitXRay(ctx, + v23.GetRuntimeFlags().VtraceFlags, + xray.Config{ServiceVersion: ""}, + vxray.EC2Plugin(), + vxray.EKSCluster(), + vxray.ContainerIDAndHost(), + vxray.MergeLogging(true)) + + servers := strings.Split(serverFlag, ",") + if len(servers) > 0 { + ctx.Infof("waiting for: %v servers: %v", len(servers), servers) + waitForServers(ctx, servers) + ctx.Infof("servers ready: %v", servers) + } client := echo.EchoServiceClient(nameFlag) @@ -59,7 +74,7 @@ func main() { if len(httpAddr) > 0 { wg.Add(1) go func() { - runHttpServer(ctx, httpAddr, client) + runHTTPServer(ctx, httpAddr, client) wg.Done() }() } @@ -73,7 +88,6 @@ func main() { close(done) }() - servers := strings.Split(serverFlag, ",") samplingRequest := &vtrace.SamplingRequest{ Name: nameFlag, } @@ -106,7 +120,7 @@ func main() { }() select { case <-done: - time.Sleep(1) + time.Sleep(time.Second * 2) case <-signals.ShutdownOnSignals(ctx): } } @@ -115,7 +129,7 @@ func main() { // /call and /call?forward-to= // will issue RPCs to the echo server. // /quit will cause the client to exit gracefully. -func runHttpServer(ctx *context.T, addr string, client echo.EchoServiceClientStub) { +func runHTTPServer(ctx *context.T, addr string, client echo.EchoServiceClientStub) { xrayHandler := xray.Handler( xray.NewFixedSegmentNamer("http.echo.client"), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -169,7 +183,7 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer, } result, err := client.Ping(ctx, now, servers) if err != nil { - ctx.Errorf("%v.%v failed: %v", nameFlag, "ping", err) + ctx.Errorf("%v.%v failed: %v", servers, "ping", err) } if len(result) < 100 { fmt.Fprintln(out, result) @@ -178,3 +192,27 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer, } return err } + +func waitForServers(ctx *context.T, servers []string) { + var wg sync.WaitGroup + wg.Add(len(servers)) + ns := v23.GetNamespace(ctx) + for _, server := range servers { + go func(server string) { + for { + _, err := ns.Resolve(ctx, server) + ctx.Infof("%v: %v: %v", server, err, errors.Is(err, naming.ErrNoSuchName)) + if errors.Is(err, naming.ErrNoSuchName) { + time.Sleep(time.Second) + continue + } + if err == nil { + break + } + ctx.Infof("%v: %v\n", server, err) + } + wg.Done() + }(server) + } + wg.Wait() +} diff --git a/x/ref/examples/echo/echod/echod.go b/x/ref/examples/echo/echod/echod.go index 6153e51d0..785f4931b 100644 --- a/x/ref/examples/echo/echod/echod.go +++ b/x/ref/examples/echo/echod/echod.go @@ -76,7 +76,13 @@ func main() { ctx, shutdown := v23.Init() defer shutdown() - ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true)) + ctx, _ = vxray.InitXRay(ctx, + v23.GetRuntimeFlags().VtraceFlags, + xray.Config{ServiceVersion: ""}, + vxray.EC2Plugin(), + vxray.EKSCluster(), + vxray.ContainerIDAndHost(), + vxray.MergeLogging(true)) ctx, server, err := v23.WithNewServer(ctx, nameFlag, echo.EchoServiceServer(&echod{}), securityflag.NewAuthorizerOrDie(ctx)) if err != nil { diff --git a/x/ref/lib/aws/vxray/config.go b/x/ref/lib/aws/vxray/config.go index 1efeba47c..6dd038eb2 100644 --- a/x/ref/lib/aws/vxray/config.go +++ b/x/ref/lib/aws/vxray/config.go @@ -9,6 +9,7 @@ package vxray import ( "fmt" + "os" "github.com/aws/aws-xray-sdk-go/awsplugins/beanstalk" "github.com/aws/aws-xray-sdk-go/awsplugins/ec2" @@ -18,15 +19,18 @@ import ( "v.io/v23/context" "v.io/v23/logging" "v.io/v23/vtrace" + "v.io/x/ref/lib/aws/vxray/internal" "v.io/x/ref/lib/flags" libvtrace "v.io/x/ref/lib/vtrace" ) type options struct { - mergeLogging bool - mapToHTTP bool - newStore bool - newStoreFlags flags.VtraceFlags + mergeLogging bool + mapToHTTP bool + newStore bool + newStoreFlags flags.VtraceFlags + configMap, configMapKey string + containerized bool } // Option represents an option to InitXRay. @@ -53,6 +57,44 @@ func BeanstalkPlugin() Option { } } +// KubernetesCluster configures obtaining information about the process' +// current environment when running under Kubernetes (k8s), whether managed by +// AWS EKS or any other control plane implementation. It requires that the +// K8S configuration creates a configmap that contains the cluster name. +// The configMap argument names that configmap and configMapKey +// is the key in that configmap for the cluster name. For example, when using +// the AWS cloudwatch/insights/xray-daemon daemonset the values for those +// would be: +// /api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info +// cluster.name +// +// When configured, xray segments will contain a 'cluster_name' annotation. +func KubernetesCluster(configMap, configMapKey string) Option { + return func(o *options) { + o.configMap, o.configMapKey = configMap, configMapKey + } +} + +// EKSCluster calls KubernetesCluster with the values commonly used +// with EKS clusters. +func EKSCluster() Option { + return KubernetesCluster("/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info", "cluster.name") +} + +// ContainerIDAndHost requests that container id and host information be +// obtained and added to traces. The container id is obtained by parsing +// the /proc/self/cgroup file, and the host by call the operating system's +// hostname function. When running under kubernetes for example, a pod's +// name is configured as its hostname. +// +// When configured, xray segments will contain 'container_id' and 'container_host' +// annotations. +func ContainerIDAndHost() Option { + return func(o *options) { + o.containerized = true + } +} + // MergeLogging arrays for xray logging messages to be merged with vanadium // log messages. func MergeLogging(v bool) Option { @@ -105,44 +147,73 @@ func (xl *xraylogger) Log(level xraylog.LogLevel, msg fmt.Stringer) { } } -func initXRay(ctx *context.T, config xray.Config, opts []Option) (*context.T, *options, error) { - o := &options{mapToHTTP: true} - for _, fn := range opts { - fn(o) - } +func (m *manager) initXRay(ctx *context.T, config xray.Config) (*context.T, error) { if err := xray.Configure(config); err != nil { ctx.Errorf("failed to configure xray context: %v", err) - return ctx, nil, err + return ctx, err } - if o.mergeLogging { + if m.options.mergeLogging { xray.SetLogger(&xraylogger{context.LoggerFromContext(ctx)}) } ctx, err := WithConfig(ctx, config) - return ctx, o, err + return ctx, err } // InitXRay configures the AWS xray service and returns a context containing // the xray configuration. This should only be called once. The vflags argument // is used solely to check if xray tracing is enabled and not to create a -// new vtrace.Store, if a new store is required, the +// new vtrace.Store, if a new/alternate store is required, the WithNewStore option +// should be used to specify the store to be used. func InitXRay(ctx *context.T, vflags flags.VtraceFlags, config xray.Config, opts ...Option) (*context.T, error) { if !vflags.EnableAWSXRay { return ctx, nil } octx := ctx - ctx, options, err := initXRay(ctx, config, opts) + mgr := &manager{} + mgr.options.mapToHTTP = true + for _, fn := range opts { + fn(&mgr.options) + } + ctx, err := mgr.initXRay(ctx, config) if err != nil { return octx, err } - - if options.newStore { - store, err := libvtrace.NewStore(options.newStoreFlags) + if mgr.options.newStore { + store, err := libvtrace.NewStore(mgr.options.newStoreFlags) if err != nil { return octx, err } ctx = vtrace.WithStore(ctx, store) } - mgr := &manager{mapToHTTP: options.mapToHTTP} + if mgr.options.containerized { + if hostNameErr == nil { + mgr.containerHost = hostName + } else { + ctx.Infof("failed to obtain host name from: %v", hostNameErr) + } + cgroupFile := "/proc/self/cgroup" + if cid, err := internal.GetContainerID(cgroupFile); err == nil { + mgr.containerID = cid + } else { + ctx.Infof("failed to obtain container id", err) + } + } + if cm := mgr.options.configMap; len(cm) > 0 { + if clusterName, err := internal.GetEKSClusterName(ctx, cm, mgr.options.configMapKey); err == nil { + mgr.clusterName = clusterName + } else { + ctx.Infof("failed to obtain cluster name from %v.%v: %v", cm, mgr.options.configMapKey, err) + } + } ctx = vtrace.WithManager(ctx, mgr) return ctx, nil } + +var ( + hostName string + hostNameErr error +) + +func init() { + hostName, hostNameErr = os.Hostname() +} diff --git a/x/ref/lib/aws/vxray/internal/eks_info.go b/x/ref/lib/aws/vxray/internal/eks_info.go new file mode 100644 index 000000000..f8f277c98 --- /dev/null +++ b/x/ref/lib/aws/vxray/internal/eks_info.go @@ -0,0 +1,111 @@ +package internal + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" +) + +const ( + k8sServiceAccountPrefix = "/var/run/secrets/kubernetes.io/serviceaccount" + k8sCert = "ca.crt" + k8sToken = "token" + k8sAPIHost = "kubernetes.default.svc" + containerIDLen = 64 +) + +func GetEKSClusterName(ctx context.Context, configMap, keyName string) (string, error) { + cm, err := getConfigMap(ctx, configMap) + if err != nil { + return "", err + } + name, ok := cm[keyName] + if !ok { + return "", fmt.Errorf("cluster name key %v not found", keyName) + } + return name, nil +} + +func getConfigMap(ctx context.Context, configMap string) (map[string]string, error) { + rootPEM, err := os.ReadFile(filepath.Join(k8sServiceAccountPrefix, k8sCert)) + if err != nil { + return nil, err + } + token, err := os.ReadFile(filepath.Join(k8sServiceAccountPrefix, k8sToken)) + if err != nil { + return nil, err + } + + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM(rootPEM) + if !ok { + panic("failed to parse root certificate") + } + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: roots, + }, + } + client := &http.Client{Transport: tr} + u := &url.URL{ + Scheme: "https", + Host: k8sAPIHost, + Path: configMap, + } + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+string(token)) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + buf := bytes.NewBuffer(make([]byte, 0, 1024)) + io.Copy(buf, resp.Body) + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%s: ERROR: %v", buf.String(), resp.StatusCode) + } + cm := struct { + API string `json:"apiVersion"` + Data map[string]string `json:"data"` + }{} + if err := json.Unmarshal(buf.Bytes(), &cm); err != nil { + return nil, fmt.Errorf("%s: %v", buf.String(), err) + } + if cm.API != "v1" && len(cm.Data) == 0 { + return nil, fmt.Errorf("API version has changed to %v: found no config map data", cm.API) + + } + return cm.Data, nil +} + +func GetContainerID(cgroupFile string) (string, error) { + rd, err := os.Open(cgroupFile) + if err != nil { + return "", err + } + sc := bufio.NewScanner(rd) + cid := "" + for sc.Scan() { + line := sc.Text() + if l := len(line); l > containerIDLen { + cid = line[l-containerIDLen:] + break + } + } + if len(cid) == 0 { + return "", fmt.Errorf("failed to find a container id in %v", cgroupFile) + } + return cid, nil +} diff --git a/x/ref/lib/aws/vxray/manager.go b/x/ref/lib/aws/vxray/manager.go index 8156b690a..ad0380b09 100644 --- a/x/ref/lib/aws/vxray/manager.go +++ b/x/ref/lib/aws/vxray/manager.go @@ -25,15 +25,43 @@ import ( // Manager allows you to create new traces and spans and access the // vtrace store that type manager struct { - mapToHTTP bool + options options + clusterName string + containerID string + containerHost string +} + +func annotateIfSet(a map[string]interface{}, k, v string) { + if len(v) == 0 { + return + } + a[k] = v +} + +func (m *manager) annotateSegment(seg *xray.Segment, subseg bool) { + if seg.Annotations == nil { + return + } + if m.options.mapToHTTP && !subseg && (seg.HTTP == nil || seg.HTTP.Request == nil) { + // Only create fake http fields if they have not already been set. + hd := seg.GetHTTP() + req := hd.GetRequest() + mapAnnotation(seg.Annotations, "name", &req.URL) + mapAnnotation(seg.Annotations, "method", &req.Method) + mapAnnotation(seg.Annotations, "clientAddr", &req.ClientIP) + req.UserAgent = "vanadium" + } + annotateIfSet(seg.Annotations, "cluster_name", m.clusterName) + annotateIfSet(seg.Annotations, "container_id", m.containerID) + annotateIfSet(seg.Annotations, "container_host", m.containerHost) } type xrayspan struct { vtrace.Span - ctx *context.T - subseg bool - mapToHTTP bool - seg *xray.Segment + ctx *context.T + subseg bool + mgr *manager + seg *xray.Segment } func (xs *xrayspan) Annotate(msg string) { @@ -93,7 +121,8 @@ func (xs *xrayspan) Finish(err error) { xs.seg.AddMetadataToNamespace("vtrace", "error", err.Error()) } xseg := xs.seg - if an := xseg.Annotations; !xs.subseg && xs.mapToHTTP && an != nil { + xs.mgr.annotateSegment(xseg, xs.subseg) + /* if an := xseg.Annotations; !xs.subseg && xs.mapToHTTP && an != nil { if xseg.HTTP == nil || xseg.HTTP.Request == nil { // Only create fake http fields if they have not already been set. hd := xseg.GetHTTP() @@ -103,7 +132,7 @@ func (xs *xrayspan) Finish(err error) { mapAnnotation(an, "clientAddr", &req.ClientIP) req.UserAgent = "vanadium" } - } + }*/ if xs.subseg { xseg.CloseAndStream(err) } else { @@ -117,7 +146,7 @@ func (xs *xrayspan) Finish(err error) { // other span. This is useful when starting operations that are // disconnected from the activity ctx is performing. For example // this might be used to start background tasks. -func (m manager) WithNewTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest) (*context.T, vtrace.Span) { +func (m *manager) WithNewTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest) (*context.T, vtrace.Span) { id, err := uniqueid.Random() if err != nil { ctx.Errorf("vtrace: couldn't generate Trace Id, debug data may be lost: %v", err) @@ -135,7 +164,7 @@ func (m manager) WithNewTrace(ctx *context.T, name string, sr *vtrace.SamplingRe sampling := translateSamplingHeader(sr) _, seg := xray.NewSegmentFromHeader(ctx, name, sampling, hdr) ctx = WithSegment(ctx, seg) - xs := &xrayspan{Span: newSpan, ctx: ctx, mapToHTTP: m.mapToHTTP, seg: seg} + xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg} return vtrace.WithSpan(ctx, xs), xs } @@ -219,7 +248,7 @@ func translateSamplingHeader(sr *vtrace.SamplingRequest) *http.Request { // a trace from a remote server. name is the name of the new span and // req contains the parameters needed to connect this span with it's // trace. -func (m manager) WithContinuedTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest, req vtrace.Request) (*context.T, vtrace.Span) { +func (m *manager) WithContinuedTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest, req vtrace.Request) (*context.T, vtrace.Span) { st := vtrace.GetStore(ctx) if st == nil { panic("nil store") @@ -249,13 +278,13 @@ func (m manager) WithContinuedTrace(ctx *context.T, name string, sr *vtrace.Samp } ctx = WithSegment(ctx, seg) - xs := &xrayspan{Span: newSpan, ctx: ctx, mapToHTTP: m.mapToHTTP, seg: seg, subseg: sub} + xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg, subseg: sub} return vtrace.WithSpan(ctx, xs), xs } // WithNewSpan derives a context with a new Span that can be used to // trace and annotate operations across process boundaries. -func (m manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) { +func (m *manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) { if curSpan := vtrace.GetSpan(ctx); curSpan != nil { if curSpan.Store() == nil { panic("nil store") @@ -267,7 +296,7 @@ func (m manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Sp seg, sub := newSegment(ctx, name, nil) ctx.VI(1).Infof("WithNewSpan: new seg: %v", segStr(seg)) ctx = WithSegment(ctx, seg) - xs := &xrayspan{Span: newSpan, ctx: ctx, mapToHTTP: m.mapToHTTP, seg: seg, subseg: sub} + xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg, subseg: sub} return vtrace.WithSpan(ctx, xs), xs } ctx.Error("vtrace: creating a new child span from context with no existing span.") @@ -275,7 +304,7 @@ func (m manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Sp } // Request generates a vtrace.Request from the active Span. -func (m manager) GetRequest(ctx *context.T) vtrace.Request { +func (m *manager) GetRequest(ctx *context.T) vtrace.Request { if span := vtrace.GetSpan(ctx); span != nil { req := span.Request(ctx) if seg := GetSegment(ctx); seg != nil { @@ -288,7 +317,7 @@ func (m manager) GetRequest(ctx *context.T) vtrace.Request { } // Response captures the vtrace.Response for the active Span. -func (m manager) GetResponse(ctx *context.T) vtrace.Response { +func (m *manager) GetResponse(ctx *context.T) vtrace.Response { if span := vtrace.GetSpan(ctx); span != nil { return vtrace.Response{ Flags: span.Store().Flags(span.Trace()), From b2e0a54eb08f0b392ceb12146b06b8846646dd51 Mon Sep 17 00:00:00 2001 From: Richard Huang Date: Thu, 22 Jul 2021 02:51:16 -0700 Subject: [PATCH 2/3] v23/context: fix goroutine leak in WithRootCancel (#214) Whenever a child context is created via `WithRootCancel`, a goroutine spawns to gracefully handle closing the child context whenever the root context gets canceled. However, the current implementation leaks goroutines whenever the child context exits before the root context exits. This pull request looks to fix the problem by exiting the goroutine whenever the child context is done. See `TestRootCancelGoroutineLeak` for the reproduction use case and test that demonstrates the leak. This is also easily visible via `pprof` using the `goroutine` module. Co-authored-by: Richard Huang --- v23/context/context.go | 8 ++++++-- v23/context/context_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/v23/context/context.go b/v23/context/context.go index 34cad72e8..2c3618598 100644 --- a/v23/context/context.go +++ b/v23/context/context.go @@ -303,8 +303,12 @@ func WithRootCancel(parent *T) (*T, CancelFunc) { // Forward the cancelation from the root context to the newly // created context. go func() { - <-rootCtx.Done() - cancel() + select { + case <-rootCtx.Done(): + cancel() + case <-ctx.Done(): + cancel() + } }() } else if atomic.AddInt32(&nRootCancelWarning, 1) < 3 { vlog.Errorf("context.WithRootCancel: context %+v is not derived from root v23 context.\n", parent) diff --git a/v23/context/context_test.go b/v23/context/context_test.go index 02fbb0b18..167f9d2d6 100644 --- a/v23/context/context_test.go +++ b/v23/context/context_test.go @@ -9,6 +9,8 @@ import ( gocontext "context" "fmt" "os" + "runtime" + "strings" "sync" "testing" "time" @@ -320,6 +322,36 @@ func TestRootCancelChain(t *testing.T) { } } +func TestRootCancelGoroutineLeak(t *testing.T) { + rootCtx, rootcancel := context.RootContext() + const iterations = 1024 + for i := 0; i != iterations; i++ { + _, cancel := context.WithRootCancel(rootCtx) + cancel() + } + + // Arbitrary threshold to wait for the goroutines in the created contexts + // above to exit. This threshold was arbitrarily created after running + // `go test -count=10000 -run TestRootCancelGoroutineLeak$` and verifying + // that the tests did not fail flakily. + const waitThreshold = 8*time.Millisecond + time.Sleep(waitThreshold) + + // Verify that goroutines no longer exist in the runtime stack. + buf := make([]byte, 2<<20) + buf = buf[:runtime.Stack(buf, true)] + count := 0 + for _, g := range strings.Split(string(buf), "\n\n") { + if strings.Contains(g, "v.io/v23/context.WithRootCancel.func1") { + count++ + } + } + if count != 0 { + t.Errorf("expected 0 but got %d: goroutine leaking in WithRootCancel", count) + } + rootcancel() +} + func TestRootCancel_GoContext(t *testing.T) { root, rootcancel := context.RootContext() From 297ba726332cf327c72722a51079cee9205efc6e Mon Sep 17 00:00:00 2001 From: Cosmos Nicolaou Date: Thu, 22 Jul 2021 18:50:52 +0200 Subject: [PATCH 3/3] v23/context: fix lint complaint (#215) --- v23/context/context_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v23/context/context_test.go b/v23/context/context_test.go index 167f9d2d6..4dbbc72c1 100644 --- a/v23/context/context_test.go +++ b/v23/context/context_test.go @@ -334,7 +334,7 @@ func TestRootCancelGoroutineLeak(t *testing.T) { // above to exit. This threshold was arbitrarily created after running // `go test -count=10000 -run TestRootCancelGoroutineLeak$` and verifying // that the tests did not fail flakily. - const waitThreshold = 8*time.Millisecond + const waitThreshold = 8 * time.Millisecond time.Sleep(waitThreshold) // Verify that goroutines no longer exist in the runtime stack.