Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions x/ref/examples/echo/bin/Dockerfile.echo
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM alpine
COPY echo /bin/echo
COPY creds/ /bin/creds/
ENTRYPOINT ["/bin/echo"]
4 changes: 4 additions & 0 deletions x/ref/examples/echo/bin/Dockerfile.echod
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM alpine
COPY echod /bin/echod
COPY creds/ /bin/creds/
ENTRYPOINT ["/bin/echod"]
4 changes: 4 additions & 0 deletions x/ref/examples/echo/bin/Dockerfile.mounttabled
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM alpine
COPY mounttabled /bin/mounttabled
COPY creds/ /bin/creds/
ENTRYPOINT ["/bin/mounttabled"]
50 changes: 44 additions & 6 deletions x/ref/examples/echo/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package main

import (
"errors"
"flag"
"fmt"
"io"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/aws/aws-xray-sdk-go/xray"
v23 "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/vtrace"
"v.io/x/ref/examples/echo"
"v.io/x/ref/lib/aws/vxray"
Expand Down Expand Up @@ -50,7 +52,20 @@ func main() {
ctx, shutdown := v23.Init()
defer shutdown()

ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true))
ctx, _ = vxray.InitXRay(ctx,
v23.GetRuntimeFlags().VtraceFlags,
xray.Config{ServiceVersion: ""},
vxray.EC2Plugin(),
vxray.EKSCluster(),
vxray.ContainerIDAndHost(),
vxray.MergeLogging(true))

servers := strings.Split(serverFlag, ",")
if len(servers) > 0 {
ctx.Infof("waiting for: %v servers: %v", len(servers), servers)
waitForServers(ctx, servers)
ctx.Infof("servers ready: %v", servers)
}

client := echo.EchoServiceClient(nameFlag)

Expand All @@ -59,7 +74,7 @@ func main() {
if len(httpAddr) > 0 {
wg.Add(1)
go func() {
runHttpServer(ctx, httpAddr, client)
runHTTPServer(ctx, httpAddr, client)
wg.Done()
}()
}
Expand All @@ -73,7 +88,6 @@ func main() {
close(done)
}()

servers := strings.Split(serverFlag, ",")
samplingRequest := &vtrace.SamplingRequest{
Name: nameFlag,
}
Expand Down Expand Up @@ -106,7 +120,7 @@ func main() {
}()
select {
case <-done:
time.Sleep(1)
time.Sleep(time.Second * 2)
case <-signals.ShutdownOnSignals(ctx):
}
}
Expand All @@ -115,7 +129,7 @@ func main() {
// <addr>/call and <addr>/call?forward-to=<server>
// will issue RPCs to the echo server.
// <addr>/quit will cause the client to exit gracefully.
func runHttpServer(ctx *context.T, addr string, client echo.EchoServiceClientStub) {
func runHTTPServer(ctx *context.T, addr string, client echo.EchoServiceClientStub) {
xrayHandler := xray.Handler(
xray.NewFixedSegmentNamer("http.echo.client"),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -169,7 +183,7 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer,
}
result, err := client.Ping(ctx, now, servers)
if err != nil {
ctx.Errorf("%v.%v failed: %v", nameFlag, "ping", err)
ctx.Errorf("%v.%v failed: %v", servers, "ping", err)
}
if len(result) < 100 {
fmt.Fprintln(out, result)
Expand All @@ -178,3 +192,27 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer,
}
return err
}

func waitForServers(ctx *context.T, servers []string) {
var wg sync.WaitGroup
wg.Add(len(servers))
ns := v23.GetNamespace(ctx)
for _, server := range servers {
go func(server string) {
for {
_, err := ns.Resolve(ctx, server)
ctx.Infof("%v: %v: %v", server, err, errors.Is(err, naming.ErrNoSuchName))
if errors.Is(err, naming.ErrNoSuchName) {
time.Sleep(time.Second)
continue
}
if err == nil {
break
}
ctx.Infof("%v: %v\n", server, err)
}
wg.Done()
}(server)
}
wg.Wait()
}
8 changes: 7 additions & 1 deletion x/ref/examples/echo/echod/echod.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ func main() {
ctx, shutdown := v23.Init()
defer shutdown()

ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true))
ctx, _ = vxray.InitXRay(ctx,
v23.GetRuntimeFlags().VtraceFlags,
xray.Config{ServiceVersion: ""},
vxray.EC2Plugin(),
vxray.EKSCluster(),
vxray.ContainerIDAndHost(),
vxray.MergeLogging(true))

ctx, server, err := v23.WithNewServer(ctx, nameFlag, echo.EchoServiceServer(&echod{}), securityflag.NewAuthorizerOrDie(ctx))
if err != nil {
Expand Down
107 changes: 89 additions & 18 deletions x/ref/lib/aws/vxray/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package vxray

import (
"fmt"
"os"

"github.com/aws/aws-xray-sdk-go/awsplugins/beanstalk"
"github.com/aws/aws-xray-sdk-go/awsplugins/ec2"
Expand All @@ -18,15 +19,18 @@ import (
"v.io/v23/context"
"v.io/v23/logging"
"v.io/v23/vtrace"
"v.io/x/ref/lib/aws/vxray/internal"
"v.io/x/ref/lib/flags"
libvtrace "v.io/x/ref/lib/vtrace"
)

type options struct {
mergeLogging bool
mapToHTTP bool
newStore bool
newStoreFlags flags.VtraceFlags
mergeLogging bool
mapToHTTP bool
newStore bool
newStoreFlags flags.VtraceFlags
configMap, configMapKey string
containerized bool
}

// Option represents an option to InitXRay.
Expand All @@ -53,6 +57,44 @@ func BeanstalkPlugin() Option {
}
}

// KubernetesCluster configures obtaining information about the process'
// current environment when running under Kubernetes (k8s), whether managed by
// AWS EKS or any other control plane implementation. It requires that the
// K8S configuration creates a configmap that contains the cluster name.
// The configMap argument names that configmap and configMapKey
// is the key in that configmap for the cluster name. For example, when using
// the AWS cloudwatch/insights/xray-daemon daemonset the values for those
// would be:
// /api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info
// cluster.name
//
// When configured, xray segments will contain a 'cluster_name' annotation.
func KubernetesCluster(configMap, configMapKey string) Option {
return func(o *options) {
o.configMap, o.configMapKey = configMap, configMapKey
}
}

// EKSCluster calls KubernetesCluster with the values commonly used
// with EKS clusters.
func EKSCluster() Option {
return KubernetesCluster("/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info", "cluster.name")
}

// ContainerIDAndHost requests that container id and host information be
// obtained and added to traces. The container id is obtained by parsing
// the /proc/self/cgroup file, and the host by call the operating system's
// hostname function. When running under kubernetes for example, a pod's
// name is configured as its hostname.
//
// When configured, xray segments will contain 'container_id' and 'container_host'
// annotations.
func ContainerIDAndHost() Option {
return func(o *options) {
o.containerized = true
}
}

// MergeLogging arrays for xray logging messages to be merged with vanadium
// log messages.
func MergeLogging(v bool) Option {
Expand Down Expand Up @@ -105,44 +147,73 @@ func (xl *xraylogger) Log(level xraylog.LogLevel, msg fmt.Stringer) {
}
}

func initXRay(ctx *context.T, config xray.Config, opts []Option) (*context.T, *options, error) {
o := &options{mapToHTTP: true}
for _, fn := range opts {
fn(o)
}
func (m *manager) initXRay(ctx *context.T, config xray.Config) (*context.T, error) {
if err := xray.Configure(config); err != nil {
ctx.Errorf("failed to configure xray context: %v", err)
return ctx, nil, err
return ctx, err
}
if o.mergeLogging {
if m.options.mergeLogging {
xray.SetLogger(&xraylogger{context.LoggerFromContext(ctx)})
}
ctx, err := WithConfig(ctx, config)
return ctx, o, err
return ctx, err
}

// InitXRay configures the AWS xray service and returns a context containing
// the xray configuration. This should only be called once. The vflags argument
// is used solely to check if xray tracing is enabled and not to create a
// new vtrace.Store, if a new store is required, the
// new vtrace.Store, if a new/alternate store is required, the WithNewStore option
// should be used to specify the store to be used.
func InitXRay(ctx *context.T, vflags flags.VtraceFlags, config xray.Config, opts ...Option) (*context.T, error) {
if !vflags.EnableAWSXRay {
return ctx, nil
}
octx := ctx
ctx, options, err := initXRay(ctx, config, opts)
mgr := &manager{}
mgr.options.mapToHTTP = true
for _, fn := range opts {
fn(&mgr.options)
}
ctx, err := mgr.initXRay(ctx, config)
if err != nil {
return octx, err
}

if options.newStore {
store, err := libvtrace.NewStore(options.newStoreFlags)
if mgr.options.newStore {
store, err := libvtrace.NewStore(mgr.options.newStoreFlags)
if err != nil {
return octx, err
}
ctx = vtrace.WithStore(ctx, store)
}
mgr := &manager{mapToHTTP: options.mapToHTTP}
if mgr.options.containerized {
if hostNameErr == nil {
mgr.containerHost = hostName
} else {
ctx.Infof("failed to obtain host name from: %v", hostNameErr)
}
cgroupFile := "/proc/self/cgroup"
if cid, err := internal.GetContainerID(cgroupFile); err == nil {
mgr.containerID = cid
} else {
ctx.Infof("failed to obtain container id", err)
}
}
if cm := mgr.options.configMap; len(cm) > 0 {
if clusterName, err := internal.GetEKSClusterName(ctx, cm, mgr.options.configMapKey); err == nil {
mgr.clusterName = clusterName
} else {
ctx.Infof("failed to obtain cluster name from %v.%v: %v", cm, mgr.options.configMapKey, err)
}
}
ctx = vtrace.WithManager(ctx, mgr)
return ctx, nil
}

var (
hostName string
hostNameErr error
)

func init() {
hostName, hostNameErr = os.Hostname()
}
Loading