-
Notifications
You must be signed in to change notification settings - Fork 2.1k
cli: add otel sdk tracing and metric providers to the core cli #4889
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
neersighted
merged 1 commit into
docker:master
from
jsternberg:universal-telemetry-client
Mar 25, 2024
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,202 @@ | ||
| package command | ||
|
|
||
| import ( | ||
| "context" | ||
| "os" | ||
| "path/filepath" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/docker/distribution/uuid" | ||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/metric" | ||
| sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||
| "go.opentelemetry.io/otel/sdk/metric/metricdata" | ||
| "go.opentelemetry.io/otel/sdk/resource" | ||
| sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
| semconv "go.opentelemetry.io/otel/semconv/v1.21.0" | ||
| "go.opentelemetry.io/otel/trace" | ||
| ) | ||
|
|
||
| const exportTimeout = 50 * time.Millisecond | ||
|
|
||
| // TracerProvider is an extension of the trace.TracerProvider interface for CLI programs. | ||
| type TracerProvider interface { | ||
| trace.TracerProvider | ||
| ForceFlush(ctx context.Context) error | ||
| Shutdown(ctx context.Context) error | ||
| } | ||
|
|
||
| // MeterProvider is an extension of the metric.MeterProvider interface for CLI programs. | ||
| type MeterProvider interface { | ||
| metric.MeterProvider | ||
| ForceFlush(ctx context.Context) error | ||
| Shutdown(ctx context.Context) error | ||
| } | ||
|
|
||
| // TelemetryClient provides the methods for using OTEL tracing or metrics. | ||
| type TelemetryClient interface { | ||
| // Resource returns the OTEL Resource configured with this TelemetryClient. | ||
| // This resource may be created lazily, but the resource should be the same | ||
| // each time this function is invoked. | ||
| Resource() *resource.Resource | ||
|
|
||
| // TracerProvider returns a TracerProvider. This TracerProvider will be configured | ||
| // with the default tracing components for a CLI program along with any options given | ||
| // for the SDK. | ||
| TracerProvider(ctx context.Context, opts ...sdktrace.TracerProviderOption) TracerProvider | ||
|
|
||
| // MeterProvider returns a MeterProvider. This MeterProvider will be configured | ||
| // with the default metric components for a CLI program along with any options given | ||
| // for the SDK. | ||
| MeterProvider(ctx context.Context, opts ...sdkmetric.Option) MeterProvider | ||
| } | ||
|
|
||
| func (cli *DockerCli) Resource() *resource.Resource { | ||
| return cli.res.Get() | ||
| } | ||
|
|
||
| func (cli *DockerCli) TracerProvider(ctx context.Context, opts ...sdktrace.TracerProviderOption) TracerProvider { | ||
| allOpts := make([]sdktrace.TracerProviderOption, 0, len(opts)+2) | ||
| allOpts = append(allOpts, sdktrace.WithResource(cli.Resource())) | ||
| allOpts = append(allOpts, dockerSpanExporter(ctx, cli)...) | ||
| allOpts = append(allOpts, opts...) | ||
| return sdktrace.NewTracerProvider(allOpts...) | ||
| } | ||
|
|
||
| func (cli *DockerCli) MeterProvider(ctx context.Context, opts ...sdkmetric.Option) MeterProvider { | ||
| allOpts := make([]sdkmetric.Option, 0, len(opts)+2) | ||
| allOpts = append(allOpts, sdkmetric.WithResource(cli.Resource())) | ||
| allOpts = append(allOpts, dockerMetricExporter(ctx, cli)...) | ||
| allOpts = append(allOpts, opts...) | ||
| return sdkmetric.NewMeterProvider(allOpts...) | ||
| } | ||
|
|
||
| // WithResourceOptions configures additional options for the default resource. The default | ||
| // resource will continue to include its default options. | ||
| func WithResourceOptions(opts ...resource.Option) CLIOption { | ||
| return func(cli *DockerCli) error { | ||
| cli.res.AppendOptions(opts...) | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| // WithResource overwrites the default resource and prevents its creation. | ||
| func WithResource(res *resource.Resource) CLIOption { | ||
| return func(cli *DockerCli) error { | ||
| cli.res.Set(res) | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| type telemetryResource struct { | ||
| res *resource.Resource | ||
| opts []resource.Option | ||
| once sync.Once | ||
| } | ||
|
|
||
| func (r *telemetryResource) Set(res *resource.Resource) { | ||
| r.res = res | ||
| } | ||
|
|
||
| func (r *telemetryResource) Get() *resource.Resource { | ||
| r.once.Do(r.init) | ||
| return r.res | ||
| } | ||
|
|
||
| func (r *telemetryResource) init() { | ||
| if r.res != nil { | ||
| r.opts = nil | ||
| return | ||
| } | ||
|
|
||
| opts := append(r.defaultOptions(), r.opts...) | ||
| res, err := resource.New(context.Background(), opts...) | ||
| if err != nil { | ||
| otel.Handle(err) | ||
| } | ||
| r.res = res | ||
|
|
||
| // Clear the resource options since they'll never be used again and to allow | ||
| // the garbage collector to retrieve that memory. | ||
| r.opts = nil | ||
| } | ||
|
|
||
| func (r *telemetryResource) defaultOptions() []resource.Option { | ||
| return []resource.Option{ | ||
| resource.WithDetectors(serviceNameDetector{}), | ||
| resource.WithAttributes( | ||
| // Use a unique instance id so OTEL knows that each invocation | ||
| // of the CLI is its own instance. Without this, downstream | ||
| // OTEL processors may think the same process is restarting | ||
| // continuously. | ||
| semconv.ServiceInstanceID(uuid.Generate().String()), | ||
| ), | ||
| resource.WithFromEnv(), | ||
| resource.WithTelemetrySDK(), | ||
| } | ||
| } | ||
|
|
||
| func (r *telemetryResource) AppendOptions(opts ...resource.Option) { | ||
| if r.res != nil { | ||
| return | ||
| } | ||
| r.opts = append(r.opts, opts...) | ||
| } | ||
|
|
||
| type serviceNameDetector struct{} | ||
|
|
||
| func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, error) { | ||
| return resource.StringDetector( | ||
| semconv.SchemaURL, | ||
| semconv.ServiceNameKey, | ||
| func() (string, error) { | ||
| return filepath.Base(os.Args[0]), nil | ||
laurazard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }, | ||
| ).Detect(ctx) | ||
| } | ||
|
|
||
| // cliReader is an implementation of Reader that will automatically | ||
| // report to a designated Exporter when Shutdown is called. | ||
| type cliReader struct { | ||
| sdkmetric.Reader | ||
| exporter sdkmetric.Exporter | ||
| } | ||
|
|
||
| func newCLIReader(exp sdkmetric.Exporter) sdkmetric.Reader { | ||
| reader := sdkmetric.NewManualReader( | ||
| sdkmetric.WithTemporalitySelector(deltaTemporality), | ||
| ) | ||
| return &cliReader{ | ||
| Reader: reader, | ||
| exporter: exp, | ||
| } | ||
| } | ||
|
|
||
| func (r *cliReader) Shutdown(ctx context.Context) error { | ||
| var rm metricdata.ResourceMetrics | ||
| if err := r.Reader.Collect(ctx, &rm); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Place a pretty tight constraint on the actual reporting. | ||
| // We don't want CLI metrics to prevent the CLI from exiting | ||
| // so if there's some kind of issue we need to abort pretty | ||
| // quickly. | ||
| ctx, cancel := context.WithTimeout(ctx, exportTimeout) | ||
| defer cancel() | ||
|
|
||
| return r.exporter.Export(ctx, &rm) | ||
| } | ||
|
|
||
| // deltaTemporality sets the Temporality of every instrument to delta. | ||
| // | ||
| // This isn't really needed since we create a unique resource on each invocation, | ||
| // but it can help with cardinality concerns for downstream processors since they can | ||
| // perform aggregation for a time interval and then discard the data once that time | ||
| // period has passed. Cumulative temporality would imply to the downstream processor | ||
| // that they might receive a successive point and they may unnecessarily keep state | ||
| // they really shouldn't. | ||
| func deltaTemporality(_ sdkmetric.InstrumentKind) metricdata.Temporality { | ||
| return metricdata.DeltaTemporality | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| package command | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "net/url" | ||
| "path" | ||
|
|
||
| "github.com/pkg/errors" | ||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" | ||
| "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" | ||
| sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||
| sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
| ) | ||
|
|
||
| const otelContextFieldName = "otel" | ||
|
|
||
| // dockerExporterOTLPEndpoint retrieves the OTLP endpoint used for the docker reporter | ||
| // from the current context. | ||
| func dockerExporterOTLPEndpoint(cli Cli) (endpoint string, secure bool) { | ||
| meta, err := cli.ContextStore().GetMetadata(cli.CurrentContext()) | ||
| if err != nil { | ||
| otel.Handle(err) | ||
| return "", false | ||
| } | ||
|
|
||
| var otelCfg any | ||
| switch m := meta.Metadata.(type) { | ||
| case DockerContext: | ||
| otelCfg = m.AdditionalFields[otelContextFieldName] | ||
| case map[string]any: | ||
| otelCfg = m[otelContextFieldName] | ||
| } | ||
|
|
||
| if otelCfg == nil { | ||
| return "", false | ||
| } | ||
|
|
||
| otelMap, ok := otelCfg.(map[string]any) | ||
| if !ok { | ||
| otel.Handle(errors.Errorf( | ||
| "unexpected type for field %q: %T (expected: %T)", | ||
| otelContextFieldName, | ||
| otelCfg, | ||
| otelMap, | ||
| )) | ||
| return "", false | ||
| } | ||
|
|
||
| // keys from https://opentelemetry.io/docs/concepts/sdk-configuration/otlp-exporter-configuration/ | ||
| endpoint, ok = otelMap["OTEL_EXPORTER_OTLP_ENDPOINT"].(string) | ||
| if !ok { | ||
| return "", false | ||
| } | ||
|
|
||
| // Parse the endpoint. The docker config expects the endpoint to be | ||
| // in the form of a URL to match the environment variable, but this | ||
| // option doesn't correspond directly to WithEndpoint. | ||
| // | ||
| // We pretend we're the same as the environment reader. | ||
| u, err := url.Parse(endpoint) | ||
| if err != nil { | ||
| otel.Handle(errors.Errorf("docker otel endpoint is invalid: %s", err)) | ||
| return "", false | ||
| } | ||
|
|
||
| switch u.Scheme { | ||
| case "unix": | ||
| // Unix sockets are a bit weird. OTEL seems to imply they | ||
| // can be used as an environment variable and are handled properly, | ||
| // but they don't seem to be as the behavior of the environment variable | ||
| // is to strip the scheme from the endpoint, but the underlying implementation | ||
| // needs the scheme to use the correct resolver. | ||
| // | ||
| // We'll just handle this in a special way and add the unix:// back to the endpoint. | ||
| endpoint = fmt.Sprintf("unix://%s", path.Join(u.Host, u.Path)) | ||
| case "https": | ||
| secure = true | ||
| fallthrough | ||
| case "http": | ||
| endpoint = path.Join(u.Host, u.Path) | ||
| } | ||
| return endpoint, secure | ||
| } | ||
|
|
||
| func dockerSpanExporter(ctx context.Context, cli Cli) []sdktrace.TracerProviderOption { | ||
| endpoint, secure := dockerExporterOTLPEndpoint(cli) | ||
| if endpoint == "" { | ||
| return nil | ||
| } | ||
|
|
||
| opts := []otlptracegrpc.Option{ | ||
| otlptracegrpc.WithEndpoint(endpoint), | ||
| } | ||
| if !secure { | ||
| opts = append(opts, otlptracegrpc.WithInsecure()) | ||
| } | ||
|
|
||
| exp, err := otlptracegrpc.New(ctx, opts...) | ||
| if err != nil { | ||
| otel.Handle(err) | ||
| return nil | ||
| } | ||
| return []sdktrace.TracerProviderOption{sdktrace.WithBatcher(exp, sdktrace.WithExportTimeout(exportTimeout))} | ||
| } | ||
|
|
||
| func dockerMetricExporter(ctx context.Context, cli Cli) []sdkmetric.Option { | ||
| endpoint, secure := dockerExporterOTLPEndpoint(cli) | ||
| if endpoint == "" { | ||
| return nil | ||
| } | ||
|
|
||
| opts := []otlpmetricgrpc.Option{ | ||
| otlpmetricgrpc.WithEndpoint(endpoint), | ||
| } | ||
| if !secure { | ||
| opts = append(opts, otlpmetricgrpc.WithInsecure()) | ||
| } | ||
|
|
||
| exp, err := otlpmetricgrpc.New(ctx, opts...) | ||
krissetto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if err != nil { | ||
| otel.Handle(err) | ||
| return nil | ||
| } | ||
| return []sdkmetric.Option{sdkmetric.WithReader(newCLIReader(exp))} | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.