Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions internal/cri/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"maps"
goruntime "runtime"
"slices"
"sort"

"github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/log"
Expand Down Expand Up @@ -59,9 +60,15 @@ func (c *criService) Status(ctx context.Context, r *runtime.StatusRequest) (*run
runtimeCondition,
networkCondition,
}},
RuntimeHandlers: slices.Collect(maps.Values(c.runtimeHandlers)),
Features: c.runtimeFeatures,
Features: c.runtimeFeatures,
}

// Ensure stable ordering of runtime handlers in response
resp.RuntimeHandlers = slices.Collect(maps.Values(c.runtimeHandlers))
sort.SliceStable(resp.RuntimeHandlers, func(i, j int) bool {
return resp.RuntimeHandlers[i].Name < resp.RuntimeHandlers[j].Name
})

if r.Verbose {
configByt, err := json.Marshal(c.config)
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions internal/cri/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@
package server

import (
"context"
"reflect"
"testing"
"unsafe"

"github.com/containerd/containerd/api/services/introspection/v1"
containerd "github.com/containerd/containerd/v2/client"
coreintrospection "github.com/containerd/containerd/v2/core/introspection"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
Expand Down Expand Up @@ -48,3 +54,86 @@ func TestRuntimeConditionContainerdHasNoDeprecationWarnings(t *testing.T) {
Status: true,
}, cond)
}

// fakeIntrospectionService is a minimal stub that implements the
// coreintrospection.Service. We need this because criService.Status()
// invokes the client.IntrospectionService() method.
type fakeIntrospectionService struct{}

var _ coreintrospection.Service = fakeIntrospectionService{}

func (fakeIntrospectionService) Plugins(ctx context.Context, _ ...string) (*introspection.PluginsResponse, error) {
return &introspection.PluginsResponse{}, nil
}

func (fakeIntrospectionService) Server(ctx context.Context) (*introspection.ServerResponse, error) {
return &introspection.ServerResponse{}, nil
}

func (fakeIntrospectionService) PluginInfo(ctx context.Context, _ string, _ string, _ any) (*introspection.PluginInfoResponse, error) {
return &introspection.PluginInfoResponse{}, nil
}

// newFakeContainerdClient returns a *containerd.Client with a stub
// IntrospectionService injected via reflection. This avoids needing a real
// gRPC connection while satisfying criService.Status().
func newFakeContainerdClient() *containerd.Client {
c := &containerd.Client{}
sv := reflect.ValueOf(c).Elem().FieldByName("services")
if !sv.IsValid() {
return c
}
f := sv.FieldByName("introspectionService")
if !f.IsValid() {
return c
}
// Make the unexported/private field introspectionService settable
f = reflect.NewAt(f.Type(), unsafe.Pointer(f.UnsafeAddr())).Elem()
f.Set(reflect.ValueOf(fakeIntrospectionService{}).Convert(f.Type()))
return c
}

// newStatusTestCRIService creates a minimal CRI service for testing
func newStatusTestCRIService() *criService {
return &criService{
client: newFakeContainerdClient(),
runtimeHandlers: make(map[string]*runtime.RuntimeHandler),
}
}

// TestStatusRuntimeHandlersOrdering checks that the runtime handlers
// returned by Status() are in the same order every time
func TestStatusRuntimeHandlersOrdering(t *testing.T) {
c := newStatusTestCRIService()

// Forge many runtime handlers to lower risk of accidental stable
// ordering on consecutive Status() calls
const numHandlers = 100
handlers := make(map[string]*runtime.RuntimeHandler, numHandlers)
for range numHandlers {
h := &runtime.RuntimeHandler{Name: "random-" + uuid.New().String()}
handlers[h.Name] = h
}
c.runtimeHandlers = handlers

// Call Status() twice
resp1, err := c.Status(context.Background(), &runtime.StatusRequest{})
assert.NoError(t, err)
assert.Len(t, resp1.RuntimeHandlers, len(handlers), "Unexpected number of runtime handlers")

resp2, err := c.Status(context.Background(), &runtime.StatusRequest{})
assert.NoError(t, err)
assert.Len(t, resp2.RuntimeHandlers, len(handlers), "Unexpected number of runtime handlers")

// Check runtime handlers are in the same order
sameOrder := true
for i := 0; i < len(resp1.RuntimeHandlers); i++ {
if resp1.RuntimeHandlers[i].Name != resp2.RuntimeHandlers[i].Name {
sameOrder = false
break
}
}

// Fail if runtime handlers order varies across calls to Status()
assert.True(t, sameOrder, "RuntimeHandlers order is unstable across Status() calls")
}
9 changes: 9 additions & 0 deletions pkg/imageverifier/bindir/bindir.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/containerd/containerd/v2/internal/tomlext"
"github.com/containerd/containerd/v2/pkg/imageverifier"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/log"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
Expand Down Expand Up @@ -123,6 +124,14 @@ func (v *ImageVerifier) runVerifier(ctx context.Context, bin string, imageName s

cmd := exec.CommandContext(ctx, binPath, args...)

// Attach OTEL propagators trace context env var to the child process
if traceContext, err := tracing.GetPropagatorsTraceContext(ctx); err != nil {
log.G(ctx).Warn("could not marshall propagators trace context", err)
} else {
traceContextEnv := fmt.Sprintf("OTEL_PROPAGATORS_TRACE_CONTEXT=%s", traceContext)
cmd.Env = append(os.Environ(), traceContextEnv)
}

// We construct our own pipes instead of using the default StdinPipe,
// StoutPipe, and StderrPipe in order to set timeouts on reads and writes.
stdinRead, stdinWrite, err := os.Pipe()
Expand Down
35 changes: 33 additions & 2 deletions pkg/tracing/plugin/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"os"
"strconv"
"strings"
"time"

"github.com/containerd/containerd/v2/pkg/deprecation"
Expand Down Expand Up @@ -52,8 +53,10 @@ const (
otlpProtocolEnv = "OTEL_EXPORTER_OTLP_PROTOCOL"
otlpTracesProtocolEnv = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"

otelTracesExporterEnv = "OTEL_TRACES_EXPORTER"
otelServiceNameEnv = "OTEL_SERVICE_NAME"
otelTracesExporterEnv = "OTEL_TRACES_EXPORTER"
otelServiceNameEnv = "OTEL_SERVICE_NAME"
otelTracesSamplerEnv = "OTEL_TRACES_SAMPLER"
otelTracesSamplerArgEnv = "OTEL_TRACES_SAMPLER_ARG"
)

func init() {
Expand Down Expand Up @@ -193,6 +196,15 @@ func newTracer(ctx context.Context, procs []trace.SpanProcessor) (io.Closer, err
for _, proc := range procs {
opts = append(opts, trace.WithSpanProcessor(proc))
}

// Configure custom NameBased sampler if specified in the env
if nameSampler := nameSamplerFromEnv(); nameSampler != nil {
opts = append(opts, trace.WithSampler(nameSampler))
// Unset the env vars so that otel sdk does not attempt to configure the sampler automatically
os.Unsetenv(otelTracesSamplerEnv)
os.Unsetenv(otelTracesSamplerArgEnv)
}

provider := trace.NewTracerProvider(opts...)
otel.SetTracerProvider(provider)

Expand All @@ -202,6 +214,25 @@ func newTracer(ctx context.Context, procs []trace.SpanProcessor) (io.Closer, err

}

func nameSamplerFromEnv() trace.Sampler {
sampler, ok := os.LookupEnv(otelTracesSamplerEnv)
if !ok {
return nil
}

sampler = strings.ToLower(strings.TrimSpace(sampler))
allowedNames := strings.Split(strings.TrimSpace(os.Getenv(otelTracesSamplerArgEnv)), ",")

switch sampler {
case samplerNameBased:
return NameBased(allowedNames)
case samplerParentBasedName:
return trace.ParentBased(NameBased(allowedNames))
default:
return nil
}
}

func warnTraceConfig(ic *plugin.InitContext) error {
if ic.Config == nil {
return nil
Expand Down
51 changes: 51 additions & 0 deletions pkg/tracing/plugin/sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package plugin

import (
"fmt"

sdkTrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

const (
samplerNameBased = "namebased"
samplerParentBasedName = "parentbased_name"
)

type NameSampler struct {
// allow is a set of names that should be sampled.
// Uses a map of empty structs for O(1) lookups and no memory overhead.
allow map[string]struct{}
}

// NameBased returns a Sampler that samples every span having a certain name.
// It should be used in conjunction with the ParentBased sampler so that the child spans are also sampled.
func NameBased(allowedNames []string) NameSampler {
allowedNamesMap := make(map[string]struct{}, len(allowedNames))
for _, name := range allowedNames {
allowedNamesMap[name] = struct{}{}
}
return NameSampler{
allow: allowedNamesMap,
}
}

func (ns NameSampler) ShouldSample(parameters sdkTrace.SamplingParameters) sdkTrace.SamplingResult {
psc := trace.SpanContextFromContext(parameters.ParentContext)

if _, ok := ns.allow[parameters.Name]; ok {
return sdkTrace.SamplingResult{
Decision: sdkTrace.RecordAndSample,
Tracestate: psc.TraceState(),
}
}

return sdkTrace.SamplingResult{
Decision: sdkTrace.Drop,
Tracestate: psc.TraceState(),
}
}

func (ns NameSampler) Description() string {
return fmt.Sprintf("NameBased:{%v}", ns.allow)
}
10 changes: 10 additions & 0 deletions pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package tracing

import (
"context"
"encoding/json"
"net/http"
"strings"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -130,3 +132,11 @@ func Attribute(k string, v any) attribute.KeyValue {
func HTTPStatusCodeAttributes(code int) []attribute.KeyValue {
return []attribute.KeyValue{semconv.HTTPStatusCodeKey.Int(code)}
}

// GetPropagatorsTraceContext returns the current propagators trace context as a JSON string
func GetPropagatorsTraceContext(ctx context.Context) ([]byte, error) {
propagator := propagation.TraceContext{}
carrier := propagation.MapCarrier{}
propagator.Inject(ctx, carrier)
return json.Marshal(carrier)
}