diff --git a/internal/cri/server/status.go b/internal/cri/server/status.go index a88872445891c..4e9722dfae567 100644 --- a/internal/cri/server/status.go +++ b/internal/cri/server/status.go @@ -23,6 +23,7 @@ import ( "maps" goruntime "runtime" "slices" + "sort" "github.com/containerd/containerd/api/services/introspection/v1" "github.com/containerd/log" @@ -59,9 +60,15 @@ func (c *criService) Status(ctx context.Context, r *runtime.StatusRequest) (*run runtimeCondition, networkCondition, }}, - RuntimeHandlers: slices.Collect(maps.Values(c.runtimeHandlers)), - Features: c.runtimeFeatures, + Features: c.runtimeFeatures, } + + // Ensure stable ordering of runtime handlers in response + resp.RuntimeHandlers = slices.Collect(maps.Values(c.runtimeHandlers)) + sort.SliceStable(resp.RuntimeHandlers, func(i, j int) bool { + return resp.RuntimeHandlers[i].Name < resp.RuntimeHandlers[j].Name + }) + if r.Verbose { configByt, err := json.Marshal(c.config) if err != nil { diff --git a/internal/cri/server/status_test.go b/internal/cri/server/status_test.go index efd380b9b1066..6ab9ffc77fe5d 100644 --- a/internal/cri/server/status_test.go +++ b/internal/cri/server/status_test.go @@ -17,9 +17,15 @@ package server import ( + "context" + "reflect" "testing" + "unsafe" "github.com/containerd/containerd/api/services/introspection/v1" + containerd "github.com/containerd/containerd/v2/client" + coreintrospection "github.com/containerd/containerd/v2/core/introspection" + "github.com/google/uuid" "github.com/stretchr/testify/assert" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" ) @@ -48,3 +54,86 @@ func TestRuntimeConditionContainerdHasNoDeprecationWarnings(t *testing.T) { Status: true, }, cond) } + +// fakeIntrospectionService is a minimal stub that implements the +// coreintrospection.Service. We need this because criService.Status() +// invokes the client.IntrospectionService() method. +type fakeIntrospectionService struct{} + +var _ coreintrospection.Service = fakeIntrospectionService{} + +func (fakeIntrospectionService) Plugins(ctx context.Context, _ ...string) (*introspection.PluginsResponse, error) { + return &introspection.PluginsResponse{}, nil +} + +func (fakeIntrospectionService) Server(ctx context.Context) (*introspection.ServerResponse, error) { + return &introspection.ServerResponse{}, nil +} + +func (fakeIntrospectionService) PluginInfo(ctx context.Context, _ string, _ string, _ any) (*introspection.PluginInfoResponse, error) { + return &introspection.PluginInfoResponse{}, nil +} + +// newFakeContainerdClient returns a *containerd.Client with a stub +// IntrospectionService injected via reflection. This avoids needing a real +// gRPC connection while satisfying criService.Status(). +func newFakeContainerdClient() *containerd.Client { + c := &containerd.Client{} + sv := reflect.ValueOf(c).Elem().FieldByName("services") + if !sv.IsValid() { + return c + } + f := sv.FieldByName("introspectionService") + if !f.IsValid() { + return c + } + // Make the unexported/private field introspectionService settable + f = reflect.NewAt(f.Type(), unsafe.Pointer(f.UnsafeAddr())).Elem() + f.Set(reflect.ValueOf(fakeIntrospectionService{}).Convert(f.Type())) + return c +} + +// newStatusTestCRIService creates a minimal CRI service for testing +func newStatusTestCRIService() *criService { + return &criService{ + client: newFakeContainerdClient(), + runtimeHandlers: make(map[string]*runtime.RuntimeHandler), + } +} + +// TestStatusRuntimeHandlersOrdering checks that the runtime handlers +// returned by Status() are in the same order every time +func TestStatusRuntimeHandlersOrdering(t *testing.T) { + c := newStatusTestCRIService() + + // Forge many runtime handlers to lower risk of accidental stable + // ordering on consecutive Status() calls + const numHandlers = 100 + handlers := make(map[string]*runtime.RuntimeHandler, numHandlers) + for range numHandlers { + h := &runtime.RuntimeHandler{Name: "random-" + uuid.New().String()} + handlers[h.Name] = h + } + c.runtimeHandlers = handlers + + // Call Status() twice + resp1, err := c.Status(context.Background(), &runtime.StatusRequest{}) + assert.NoError(t, err) + assert.Len(t, resp1.RuntimeHandlers, len(handlers), "Unexpected number of runtime handlers") + + resp2, err := c.Status(context.Background(), &runtime.StatusRequest{}) + assert.NoError(t, err) + assert.Len(t, resp2.RuntimeHandlers, len(handlers), "Unexpected number of runtime handlers") + + // Check runtime handlers are in the same order + sameOrder := true + for i := 0; i < len(resp1.RuntimeHandlers); i++ { + if resp1.RuntimeHandlers[i].Name != resp2.RuntimeHandlers[i].Name { + sameOrder = false + break + } + } + + // Fail if runtime handlers order varies across calls to Status() + assert.True(t, sameOrder, "RuntimeHandlers order is unstable across Status() calls") +} diff --git a/pkg/imageverifier/bindir/bindir.go b/pkg/imageverifier/bindir/bindir.go index 198f7643a452a..cc053c7f0557e 100644 --- a/pkg/imageverifier/bindir/bindir.go +++ b/pkg/imageverifier/bindir/bindir.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/containerd/v2/internal/tomlext" "github.com/containerd/containerd/v2/pkg/imageverifier" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/log" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -123,6 +124,14 @@ func (v *ImageVerifier) runVerifier(ctx context.Context, bin string, imageName s cmd := exec.CommandContext(ctx, binPath, args...) + // Attach OTEL propagators trace context env var to the child process + if traceContext, err := tracing.GetPropagatorsTraceContext(ctx); err != nil { + log.G(ctx).Warn("could not marshall propagators trace context", err) + } else { + traceContextEnv := fmt.Sprintf("OTEL_PROPAGATORS_TRACE_CONTEXT=%s", traceContext) + cmd.Env = append(os.Environ(), traceContextEnv) + } + // We construct our own pipes instead of using the default StdinPipe, // StoutPipe, and StderrPipe in order to set timeouts on reads and writes. stdinRead, stdinWrite, err := os.Pipe() diff --git a/pkg/tracing/plugin/otlp.go b/pkg/tracing/plugin/otlp.go index 5da9ad80bb57a..dfa7e792de88e 100644 --- a/pkg/tracing/plugin/otlp.go +++ b/pkg/tracing/plugin/otlp.go @@ -22,6 +22,7 @@ import ( "io" "os" "strconv" + "strings" "time" "github.com/containerd/containerd/v2/pkg/deprecation" @@ -52,8 +53,10 @@ const ( otlpProtocolEnv = "OTEL_EXPORTER_OTLP_PROTOCOL" otlpTracesProtocolEnv = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL" - otelTracesExporterEnv = "OTEL_TRACES_EXPORTER" - otelServiceNameEnv = "OTEL_SERVICE_NAME" + otelTracesExporterEnv = "OTEL_TRACES_EXPORTER" + otelServiceNameEnv = "OTEL_SERVICE_NAME" + otelTracesSamplerEnv = "OTEL_TRACES_SAMPLER" + otelTracesSamplerArgEnv = "OTEL_TRACES_SAMPLER_ARG" ) func init() { @@ -193,6 +196,15 @@ func newTracer(ctx context.Context, procs []trace.SpanProcessor) (io.Closer, err for _, proc := range procs { opts = append(opts, trace.WithSpanProcessor(proc)) } + + // Configure custom NameBased sampler if specified in the env + if nameSampler := nameSamplerFromEnv(); nameSampler != nil { + opts = append(opts, trace.WithSampler(nameSampler)) + // Unset the env vars so that otel sdk does not attempt to configure the sampler automatically + os.Unsetenv(otelTracesSamplerEnv) + os.Unsetenv(otelTracesSamplerArgEnv) + } + provider := trace.NewTracerProvider(opts...) otel.SetTracerProvider(provider) @@ -202,6 +214,25 @@ func newTracer(ctx context.Context, procs []trace.SpanProcessor) (io.Closer, err } +func nameSamplerFromEnv() trace.Sampler { + sampler, ok := os.LookupEnv(otelTracesSamplerEnv) + if !ok { + return nil + } + + sampler = strings.ToLower(strings.TrimSpace(sampler)) + allowedNames := strings.Split(strings.TrimSpace(os.Getenv(otelTracesSamplerArgEnv)), ",") + + switch sampler { + case samplerNameBased: + return NameBased(allowedNames) + case samplerParentBasedName: + return trace.ParentBased(NameBased(allowedNames)) + default: + return nil + } +} + func warnTraceConfig(ic *plugin.InitContext) error { if ic.Config == nil { return nil diff --git a/pkg/tracing/plugin/sampler.go b/pkg/tracing/plugin/sampler.go new file mode 100644 index 0000000000000..6f87a0d1ddb2e --- /dev/null +++ b/pkg/tracing/plugin/sampler.go @@ -0,0 +1,51 @@ +package plugin + +import ( + "fmt" + + sdkTrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" +) + +const ( + samplerNameBased = "namebased" + samplerParentBasedName = "parentbased_name" +) + +type NameSampler struct { + // allow is a set of names that should be sampled. + // Uses a map of empty structs for O(1) lookups and no memory overhead. + allow map[string]struct{} +} + +// NameBased returns a Sampler that samples every span having a certain name. +// It should be used in conjunction with the ParentBased sampler so that the child spans are also sampled. +func NameBased(allowedNames []string) NameSampler { + allowedNamesMap := make(map[string]struct{}, len(allowedNames)) + for _, name := range allowedNames { + allowedNamesMap[name] = struct{}{} + } + return NameSampler{ + allow: allowedNamesMap, + } +} + +func (ns NameSampler) ShouldSample(parameters sdkTrace.SamplingParameters) sdkTrace.SamplingResult { + psc := trace.SpanContextFromContext(parameters.ParentContext) + + if _, ok := ns.allow[parameters.Name]; ok { + return sdkTrace.SamplingResult{ + Decision: sdkTrace.RecordAndSample, + Tracestate: psc.TraceState(), + } + } + + return sdkTrace.SamplingResult{ + Decision: sdkTrace.Drop, + Tracestate: psc.TraceState(), + } +} + +func (ns NameSampler) Description() string { + return fmt.Sprintf("NameBased:{%v}", ns.allow) +} diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 48d760feb8f75..310272e43ad16 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -18,6 +18,7 @@ package tracing import ( "context" + "encoding/json" "net/http" "strings" @@ -25,6 +26,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" "go.opentelemetry.io/otel/trace" ) @@ -130,3 +132,11 @@ func Attribute(k string, v any) attribute.KeyValue { func HTTPStatusCodeAttributes(code int) []attribute.KeyValue { return []attribute.KeyValue{semconv.HTTPStatusCodeKey.Int(code)} } + +// GetPropagatorsTraceContext returns the current propagators trace context as a JSON string +func GetPropagatorsTraceContext(ctx context.Context) ([]byte, error) { + propagator := propagation.TraceContext{} + carrier := propagation.MapCarrier{} + propagator.Inject(ctx, carrier) + return json.Marshal(carrier) +}