diff --git a/x/ref/examples/echo/bin/Dockerfile.echo b/x/ref/examples/echo/bin/Dockerfile.echo new file mode 100644 index 000000000..f047fafd7 --- /dev/null +++ b/x/ref/examples/echo/bin/Dockerfile.echo @@ -0,0 +1,4 @@ +FROM alpine +COPY echo /bin/echo +COPY creds/ /bin/creds/ +ENTRYPOINT ["/bin/echo"] diff --git a/x/ref/examples/echo/bin/Dockerfile.echod b/x/ref/examples/echo/bin/Dockerfile.echod new file mode 100644 index 000000000..8add85c92 --- /dev/null +++ b/x/ref/examples/echo/bin/Dockerfile.echod @@ -0,0 +1,4 @@ +FROM alpine +COPY echod /bin/echod +COPY creds/ /bin/creds/ +ENTRYPOINT ["/bin/echod"] diff --git a/x/ref/examples/echo/bin/Dockerfile.mounttabled b/x/ref/examples/echo/bin/Dockerfile.mounttabled new file mode 100644 index 000000000..550171cd8 --- /dev/null +++ b/x/ref/examples/echo/bin/Dockerfile.mounttabled @@ -0,0 +1,4 @@ +FROM alpine +COPY mounttabled /bin/mounttabled +COPY creds/ /bin/creds/ +ENTRYPOINT ["/bin/mounttabled"] diff --git a/x/ref/examples/echo/echo/echo.go b/x/ref/examples/echo/echo/echo.go index 066372583..8e9a8d591 100644 --- a/x/ref/examples/echo/echo/echo.go +++ b/x/ref/examples/echo/echo/echo.go @@ -6,6 +6,7 @@ package main import ( + "errors" "flag" "fmt" "io" @@ -18,6 +19,7 @@ import ( "github.com/aws/aws-xray-sdk-go/xray" v23 "v.io/v23" "v.io/v23/context" + "v.io/v23/naming" "v.io/v23/vtrace" "v.io/x/ref/examples/echo" "v.io/x/ref/lib/aws/vxray" @@ -50,7 +52,20 @@ func main() { ctx, shutdown := v23.Init() defer shutdown() - ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true)) + ctx, _ = vxray.InitXRay(ctx, + v23.GetRuntimeFlags().VtraceFlags, + xray.Config{ServiceVersion: ""}, + vxray.EC2Plugin(), + vxray.EKSCluster(), + vxray.ContainerIDAndHost(), + vxray.MergeLogging(true)) + + servers := strings.Split(serverFlag, ",") + if len(servers) > 0 { + ctx.Infof("waiting for: %v servers: %v", len(servers), servers) + waitForServers(ctx, servers) + ctx.Infof("servers ready: %v", servers) + } client := echo.EchoServiceClient(nameFlag) @@ -59,7 +74,7 @@ func main() { if len(httpAddr) > 0 { wg.Add(1) go func() { - runHttpServer(ctx, httpAddr, client) + runHTTPServer(ctx, httpAddr, client) wg.Done() }() } @@ -73,7 +88,6 @@ func main() { close(done) }() - servers := strings.Split(serverFlag, ",") samplingRequest := &vtrace.SamplingRequest{ Name: nameFlag, } @@ -106,7 +120,7 @@ func main() { }() select { case <-done: - time.Sleep(1) + time.Sleep(time.Second * 2) case <-signals.ShutdownOnSignals(ctx): } } @@ -115,7 +129,7 @@ func main() { // /call and /call?forward-to= // will issue RPCs to the echo server. // /quit will cause the client to exit gracefully. -func runHttpServer(ctx *context.T, addr string, client echo.EchoServiceClientStub) { +func runHTTPServer(ctx *context.T, addr string, client echo.EchoServiceClientStub) { xrayHandler := xray.Handler( xray.NewFixedSegmentNamer("http.echo.client"), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -169,7 +183,7 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer, } result, err := client.Ping(ctx, now, servers) if err != nil { - ctx.Errorf("%v.%v failed: %v", nameFlag, "ping", err) + ctx.Errorf("%v.%v failed: %v", servers, "ping", err) } if len(result) < 100 { fmt.Fprintln(out, result) @@ -178,3 +192,27 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer, } return err } + +func waitForServers(ctx *context.T, servers []string) { + var wg sync.WaitGroup + wg.Add(len(servers)) + ns := v23.GetNamespace(ctx) + for _, server := range servers { + go func(server string) { + for { + _, err := ns.Resolve(ctx, server) + ctx.Infof("%v: %v: %v", server, err, errors.Is(err, naming.ErrNoSuchName)) + if errors.Is(err, naming.ErrNoSuchName) { + time.Sleep(time.Second) + continue + } + if err == nil { + break + } + ctx.Infof("%v: %v\n", server, err) + } + wg.Done() + }(server) + } + wg.Wait() +} diff --git a/x/ref/examples/echo/echod/echod.go b/x/ref/examples/echo/echod/echod.go index 6153e51d0..785f4931b 100644 --- a/x/ref/examples/echo/echod/echod.go +++ b/x/ref/examples/echo/echod/echod.go @@ -76,7 +76,13 @@ func main() { ctx, shutdown := v23.Init() defer shutdown() - ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true)) + ctx, _ = vxray.InitXRay(ctx, + v23.GetRuntimeFlags().VtraceFlags, + xray.Config{ServiceVersion: ""}, + vxray.EC2Plugin(), + vxray.EKSCluster(), + vxray.ContainerIDAndHost(), + vxray.MergeLogging(true)) ctx, server, err := v23.WithNewServer(ctx, nameFlag, echo.EchoServiceServer(&echod{}), securityflag.NewAuthorizerOrDie(ctx)) if err != nil { diff --git a/x/ref/lib/aws/vxray/config.go b/x/ref/lib/aws/vxray/config.go index 1efeba47c..6dd038eb2 100644 --- a/x/ref/lib/aws/vxray/config.go +++ b/x/ref/lib/aws/vxray/config.go @@ -9,6 +9,7 @@ package vxray import ( "fmt" + "os" "github.com/aws/aws-xray-sdk-go/awsplugins/beanstalk" "github.com/aws/aws-xray-sdk-go/awsplugins/ec2" @@ -18,15 +19,18 @@ import ( "v.io/v23/context" "v.io/v23/logging" "v.io/v23/vtrace" + "v.io/x/ref/lib/aws/vxray/internal" "v.io/x/ref/lib/flags" libvtrace "v.io/x/ref/lib/vtrace" ) type options struct { - mergeLogging bool - mapToHTTP bool - newStore bool - newStoreFlags flags.VtraceFlags + mergeLogging bool + mapToHTTP bool + newStore bool + newStoreFlags flags.VtraceFlags + configMap, configMapKey string + containerized bool } // Option represents an option to InitXRay. @@ -53,6 +57,44 @@ func BeanstalkPlugin() Option { } } +// KubernetesCluster configures obtaining information about the process' +// current environment when running under Kubernetes (k8s), whether managed by +// AWS EKS or any other control plane implementation. It requires that the +// K8S configuration creates a configmap that contains the cluster name. +// The configMap argument names that configmap and configMapKey +// is the key in that configmap for the cluster name. For example, when using +// the AWS cloudwatch/insights/xray-daemon daemonset the values for those +// would be: +// /api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info +// cluster.name +// +// When configured, xray segments will contain a 'cluster_name' annotation. +func KubernetesCluster(configMap, configMapKey string) Option { + return func(o *options) { + o.configMap, o.configMapKey = configMap, configMapKey + } +} + +// EKSCluster calls KubernetesCluster with the values commonly used +// with EKS clusters. +func EKSCluster() Option { + return KubernetesCluster("/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info", "cluster.name") +} + +// ContainerIDAndHost requests that container id and host information be +// obtained and added to traces. The container id is obtained by parsing +// the /proc/self/cgroup file, and the host by call the operating system's +// hostname function. When running under kubernetes for example, a pod's +// name is configured as its hostname. +// +// When configured, xray segments will contain 'container_id' and 'container_host' +// annotations. +func ContainerIDAndHost() Option { + return func(o *options) { + o.containerized = true + } +} + // MergeLogging arrays for xray logging messages to be merged with vanadium // log messages. func MergeLogging(v bool) Option { @@ -105,44 +147,73 @@ func (xl *xraylogger) Log(level xraylog.LogLevel, msg fmt.Stringer) { } } -func initXRay(ctx *context.T, config xray.Config, opts []Option) (*context.T, *options, error) { - o := &options{mapToHTTP: true} - for _, fn := range opts { - fn(o) - } +func (m *manager) initXRay(ctx *context.T, config xray.Config) (*context.T, error) { if err := xray.Configure(config); err != nil { ctx.Errorf("failed to configure xray context: %v", err) - return ctx, nil, err + return ctx, err } - if o.mergeLogging { + if m.options.mergeLogging { xray.SetLogger(&xraylogger{context.LoggerFromContext(ctx)}) } ctx, err := WithConfig(ctx, config) - return ctx, o, err + return ctx, err } // InitXRay configures the AWS xray service and returns a context containing // the xray configuration. This should only be called once. The vflags argument // is used solely to check if xray tracing is enabled and not to create a -// new vtrace.Store, if a new store is required, the +// new vtrace.Store, if a new/alternate store is required, the WithNewStore option +// should be used to specify the store to be used. func InitXRay(ctx *context.T, vflags flags.VtraceFlags, config xray.Config, opts ...Option) (*context.T, error) { if !vflags.EnableAWSXRay { return ctx, nil } octx := ctx - ctx, options, err := initXRay(ctx, config, opts) + mgr := &manager{} + mgr.options.mapToHTTP = true + for _, fn := range opts { + fn(&mgr.options) + } + ctx, err := mgr.initXRay(ctx, config) if err != nil { return octx, err } - - if options.newStore { - store, err := libvtrace.NewStore(options.newStoreFlags) + if mgr.options.newStore { + store, err := libvtrace.NewStore(mgr.options.newStoreFlags) if err != nil { return octx, err } ctx = vtrace.WithStore(ctx, store) } - mgr := &manager{mapToHTTP: options.mapToHTTP} + if mgr.options.containerized { + if hostNameErr == nil { + mgr.containerHost = hostName + } else { + ctx.Infof("failed to obtain host name from: %v", hostNameErr) + } + cgroupFile := "/proc/self/cgroup" + if cid, err := internal.GetContainerID(cgroupFile); err == nil { + mgr.containerID = cid + } else { + ctx.Infof("failed to obtain container id", err) + } + } + if cm := mgr.options.configMap; len(cm) > 0 { + if clusterName, err := internal.GetEKSClusterName(ctx, cm, mgr.options.configMapKey); err == nil { + mgr.clusterName = clusterName + } else { + ctx.Infof("failed to obtain cluster name from %v.%v: %v", cm, mgr.options.configMapKey, err) + } + } ctx = vtrace.WithManager(ctx, mgr) return ctx, nil } + +var ( + hostName string + hostNameErr error +) + +func init() { + hostName, hostNameErr = os.Hostname() +} diff --git a/x/ref/lib/aws/vxray/internal/eks_info.go b/x/ref/lib/aws/vxray/internal/eks_info.go new file mode 100644 index 000000000..f8f277c98 --- /dev/null +++ b/x/ref/lib/aws/vxray/internal/eks_info.go @@ -0,0 +1,111 @@ +package internal + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" +) + +const ( + k8sServiceAccountPrefix = "/var/run/secrets/kubernetes.io/serviceaccount" + k8sCert = "ca.crt" + k8sToken = "token" + k8sAPIHost = "kubernetes.default.svc" + containerIDLen = 64 +) + +func GetEKSClusterName(ctx context.Context, configMap, keyName string) (string, error) { + cm, err := getConfigMap(ctx, configMap) + if err != nil { + return "", err + } + name, ok := cm[keyName] + if !ok { + return "", fmt.Errorf("cluster name key %v not found", keyName) + } + return name, nil +} + +func getConfigMap(ctx context.Context, configMap string) (map[string]string, error) { + rootPEM, err := os.ReadFile(filepath.Join(k8sServiceAccountPrefix, k8sCert)) + if err != nil { + return nil, err + } + token, err := os.ReadFile(filepath.Join(k8sServiceAccountPrefix, k8sToken)) + if err != nil { + return nil, err + } + + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM(rootPEM) + if !ok { + panic("failed to parse root certificate") + } + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: roots, + }, + } + client := &http.Client{Transport: tr} + u := &url.URL{ + Scheme: "https", + Host: k8sAPIHost, + Path: configMap, + } + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+string(token)) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + buf := bytes.NewBuffer(make([]byte, 0, 1024)) + io.Copy(buf, resp.Body) + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%s: ERROR: %v", buf.String(), resp.StatusCode) + } + cm := struct { + API string `json:"apiVersion"` + Data map[string]string `json:"data"` + }{} + if err := json.Unmarshal(buf.Bytes(), &cm); err != nil { + return nil, fmt.Errorf("%s: %v", buf.String(), err) + } + if cm.API != "v1" && len(cm.Data) == 0 { + return nil, fmt.Errorf("API version has changed to %v: found no config map data", cm.API) + + } + return cm.Data, nil +} + +func GetContainerID(cgroupFile string) (string, error) { + rd, err := os.Open(cgroupFile) + if err != nil { + return "", err + } + sc := bufio.NewScanner(rd) + cid := "" + for sc.Scan() { + line := sc.Text() + if l := len(line); l > containerIDLen { + cid = line[l-containerIDLen:] + break + } + } + if len(cid) == 0 { + return "", fmt.Errorf("failed to find a container id in %v", cgroupFile) + } + return cid, nil +} diff --git a/x/ref/lib/aws/vxray/manager.go b/x/ref/lib/aws/vxray/manager.go index 8156b690a..ad0380b09 100644 --- a/x/ref/lib/aws/vxray/manager.go +++ b/x/ref/lib/aws/vxray/manager.go @@ -25,15 +25,43 @@ import ( // Manager allows you to create new traces and spans and access the // vtrace store that type manager struct { - mapToHTTP bool + options options + clusterName string + containerID string + containerHost string +} + +func annotateIfSet(a map[string]interface{}, k, v string) { + if len(v) == 0 { + return + } + a[k] = v +} + +func (m *manager) annotateSegment(seg *xray.Segment, subseg bool) { + if seg.Annotations == nil { + return + } + if m.options.mapToHTTP && !subseg && (seg.HTTP == nil || seg.HTTP.Request == nil) { + // Only create fake http fields if they have not already been set. + hd := seg.GetHTTP() + req := hd.GetRequest() + mapAnnotation(seg.Annotations, "name", &req.URL) + mapAnnotation(seg.Annotations, "method", &req.Method) + mapAnnotation(seg.Annotations, "clientAddr", &req.ClientIP) + req.UserAgent = "vanadium" + } + annotateIfSet(seg.Annotations, "cluster_name", m.clusterName) + annotateIfSet(seg.Annotations, "container_id", m.containerID) + annotateIfSet(seg.Annotations, "container_host", m.containerHost) } type xrayspan struct { vtrace.Span - ctx *context.T - subseg bool - mapToHTTP bool - seg *xray.Segment + ctx *context.T + subseg bool + mgr *manager + seg *xray.Segment } func (xs *xrayspan) Annotate(msg string) { @@ -93,7 +121,8 @@ func (xs *xrayspan) Finish(err error) { xs.seg.AddMetadataToNamespace("vtrace", "error", err.Error()) } xseg := xs.seg - if an := xseg.Annotations; !xs.subseg && xs.mapToHTTP && an != nil { + xs.mgr.annotateSegment(xseg, xs.subseg) + /* if an := xseg.Annotations; !xs.subseg && xs.mapToHTTP && an != nil { if xseg.HTTP == nil || xseg.HTTP.Request == nil { // Only create fake http fields if they have not already been set. hd := xseg.GetHTTP() @@ -103,7 +132,7 @@ func (xs *xrayspan) Finish(err error) { mapAnnotation(an, "clientAddr", &req.ClientIP) req.UserAgent = "vanadium" } - } + }*/ if xs.subseg { xseg.CloseAndStream(err) } else { @@ -117,7 +146,7 @@ func (xs *xrayspan) Finish(err error) { // other span. This is useful when starting operations that are // disconnected from the activity ctx is performing. For example // this might be used to start background tasks. -func (m manager) WithNewTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest) (*context.T, vtrace.Span) { +func (m *manager) WithNewTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest) (*context.T, vtrace.Span) { id, err := uniqueid.Random() if err != nil { ctx.Errorf("vtrace: couldn't generate Trace Id, debug data may be lost: %v", err) @@ -135,7 +164,7 @@ func (m manager) WithNewTrace(ctx *context.T, name string, sr *vtrace.SamplingRe sampling := translateSamplingHeader(sr) _, seg := xray.NewSegmentFromHeader(ctx, name, sampling, hdr) ctx = WithSegment(ctx, seg) - xs := &xrayspan{Span: newSpan, ctx: ctx, mapToHTTP: m.mapToHTTP, seg: seg} + xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg} return vtrace.WithSpan(ctx, xs), xs } @@ -219,7 +248,7 @@ func translateSamplingHeader(sr *vtrace.SamplingRequest) *http.Request { // a trace from a remote server. name is the name of the new span and // req contains the parameters needed to connect this span with it's // trace. -func (m manager) WithContinuedTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest, req vtrace.Request) (*context.T, vtrace.Span) { +func (m *manager) WithContinuedTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest, req vtrace.Request) (*context.T, vtrace.Span) { st := vtrace.GetStore(ctx) if st == nil { panic("nil store") @@ -249,13 +278,13 @@ func (m manager) WithContinuedTrace(ctx *context.T, name string, sr *vtrace.Samp } ctx = WithSegment(ctx, seg) - xs := &xrayspan{Span: newSpan, ctx: ctx, mapToHTTP: m.mapToHTTP, seg: seg, subseg: sub} + xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg, subseg: sub} return vtrace.WithSpan(ctx, xs), xs } // WithNewSpan derives a context with a new Span that can be used to // trace and annotate operations across process boundaries. -func (m manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) { +func (m *manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) { if curSpan := vtrace.GetSpan(ctx); curSpan != nil { if curSpan.Store() == nil { panic("nil store") @@ -267,7 +296,7 @@ func (m manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Sp seg, sub := newSegment(ctx, name, nil) ctx.VI(1).Infof("WithNewSpan: new seg: %v", segStr(seg)) ctx = WithSegment(ctx, seg) - xs := &xrayspan{Span: newSpan, ctx: ctx, mapToHTTP: m.mapToHTTP, seg: seg, subseg: sub} + xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg, subseg: sub} return vtrace.WithSpan(ctx, xs), xs } ctx.Error("vtrace: creating a new child span from context with no existing span.") @@ -275,7 +304,7 @@ func (m manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Sp } // Request generates a vtrace.Request from the active Span. -func (m manager) GetRequest(ctx *context.T) vtrace.Request { +func (m *manager) GetRequest(ctx *context.T) vtrace.Request { if span := vtrace.GetSpan(ctx); span != nil { req := span.Request(ctx) if seg := GetSegment(ctx); seg != nil { @@ -288,7 +317,7 @@ func (m manager) GetRequest(ctx *context.T) vtrace.Request { } // Response captures the vtrace.Response for the active Span. -func (m manager) GetResponse(ctx *context.T) vtrace.Response { +func (m *manager) GetResponse(ctx *context.T) vtrace.Response { if span := vtrace.GetSpan(ctx); span != nil { return vtrace.Response{ Flags: span.Store().Flags(span.Trace()),