Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions instrumentation/opentelemetry/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ import (
"google.golang.org/grpc/resolver"
)

var batchTimeout = time.Duration(200) * time.Millisecond

var (
traceProviders map[string]*sdktrace.TracerProvider
globalSampler sdktrace.Sampler
initialized = false
enabled = false
mu sync.Mutex
exporterFactory func() (sdktrace.SpanExporter, error)
batchTimeout = time.Duration(200) * time.Millisecond
traceProviders map[string]*sdktrace.TracerProvider
globalSampler sdktrace.Sampler
initialized = false
enabled = false
mu sync.Mutex
exporterFactory func() (sdktrace.SpanExporter, error)
versionInfoAttributes = []attribute.KeyValue{
semconv.TelemetrySDKNameKey.String("hypertrace"),
semconv.TelemetrySDKVersionKey.String(version.Version),
}
)

func makePropagator(formats []config.PropagationFormat) propagation.TextMapPropagator {
Expand Down Expand Up @@ -208,12 +211,13 @@ func createCaCertPoolFromFile(certFile string) *x509.CertPool {
// Init initializes opentelemetry tracing and returns a shutdown function to flush data immediately
// on a termination signal.
func Init(cfg *config.AgentConfig) func() {
return InitWithSpanProcessorWrapper(cfg, nil)
return InitWithSpanProcessorWrapper(cfg, nil, versionInfoAttributes)
}

// InitWithSpanProcessorWrapper initializes opentelemetry tracing with a wrapper over span processor
// and returns a shutdown function to flush data immediately on a termination signal.
func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper) func() {
func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what is the advantage of passing around the func rather than the slice?

Secondary is, can the versionInfo be const?

Copy link
Copy Markdown
Collaborator Author

@tim-mwangi tim-mwangi Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could make it a var which will be an attribute/key-pair array.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

versionInfoAttrs []attribute.KeyValue) func() {
mu.Lock()
defer mu.Unlock()
if initialized {
Expand All @@ -237,7 +241,7 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor
}

// Initialize metrics
metricsShutdownFn := initializeMetrics(cfg)
metricsShutdownFn := initializeMetrics(cfg, versionInfoAttrs)

exporterFactory = makeExporterFactory(cfg)

Expand All @@ -256,7 +260,8 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor

resources, err := resource.New(
context.Background(),
resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes)...),
resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes,
versionInfoAttrs)...),
)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -310,14 +315,15 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor
}
}

func createResources(serviceName string, resources map[string]string) []attribute.KeyValue {
func createResources(serviceName string, resources map[string]string,
versionInfo []attribute.KeyValue) []attribute.KeyValue {
retValues := []attribute.KeyValue{
semconv.ServiceNameKey.String(serviceName),
semconv.TelemetrySDKNameKey.String("hypertrace"),
semconv.TelemetrySDKVersionKey.String(version.Version),
semconv.TelemetrySDKLanguageGo,
}

retValues = append(retValues, versionInfo...)

for k, v := range resources {
retValues = append(retValues, attribute.String(k, v))
}
Expand All @@ -326,13 +332,13 @@ func createResources(serviceName string, resources map[string]string) []attribut

// RegisterService creates tracerprovider for a new service and returns a func which can be used to create spans and the TracerProvider
func RegisterService(serviceName string, resourceAttributes map[string]string) (sdk.StartSpan, trace.TracerProvider, error) {
return RegisterServiceWithSpanProcessorWrapper(serviceName, resourceAttributes, nil)
return RegisterServiceWithSpanProcessorWrapper(serviceName, resourceAttributes, nil, versionInfoAttributes)
}

// RegisterServiceWithSpanProcessorWrapper creates a tracerprovider for a new service with a wrapper over opentelemetry span processor
// and returns a func which can be used to create spans and the TracerProvider
func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttributes map[string]string,
wrapper SpanProcessorWrapper) (sdk.StartSpan, trace.TracerProvider, error) {
wrapper SpanProcessorWrapper, versionInfoAttrs []attribute.KeyValue) (sdk.StartSpan, trace.TracerProvider, error) {
mu.Lock()
defer mu.Unlock()
if !initialized {
Expand Down Expand Up @@ -362,7 +368,7 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu

resources, err := resource.New(
context.Background(),
resource.WithAttributes(createResources(serviceName, resourceAttributes)...),
resource.WithAttributes(createResources(serviceName, resourceAttributes, versionInfoAttrs)...),
)
if err != nil {
log.Fatal(err)
Expand All @@ -379,7 +385,7 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu
}), tp, nil
}

func initializeMetrics(cfg *config.AgentConfig) func() {
func initializeMetrics(cfg *config.AgentConfig, versionInfoAttrs []attribute.KeyValue) func() {
if shouldDisableMetrics(cfg) {
return func() {}
}
Expand All @@ -391,7 +397,7 @@ func initializeMetrics(cfg *config.AgentConfig) func() {
}
periodicReader := metric.NewPeriodicReader(metricsExporter)

resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes)
resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoAttrs)
resourceKvps = append(resourceKvps, identifier.ServiceInstanceKeyValue)
metricResources, err := resource.New(context.Background(), resource.WithAttributes(resourceKvps...))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion instrumentation/opentelemetry/init_additional.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func InitAsAdditional(cfg *config.AgentConfig) (trace.SpanProcessor, func()) {
if cfg.GetServiceName().GetValue() != "" {
resource, err := resource.New(
context.Background(),
resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes)...),
resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoAttributes)...),
)
if err != nil {
log.Fatal(err)
Expand Down
5 changes: 3 additions & 2 deletions instrumentation/opentelemetry/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func TestInitWithSpanProcessorWrapper(t *testing.T) {
cfg.Reporting.Endpoint = config.String(srv.URL)

wrapper := &mockSpanProcessorWrapper{}
shutdown := InitWithSpanProcessorWrapper(cfg, wrapper)
shutdown := InitWithSpanProcessorWrapper(cfg, wrapper, versionInfoAttributes)
defer shutdown()

// test wrapper is called for spans created by global trace provider
Expand All @@ -380,7 +380,8 @@ func TestInitWithSpanProcessorWrapper(t *testing.T) {
assert.Equal(t, 2, wrapper.onEndCount)

// test wrapper is called for spans created by service trace provider
startSpan, _, err := RegisterServiceWithSpanProcessorWrapper("custom_service", map[string]string{"test1": "val1"}, wrapper)
startSpan, _, err := RegisterServiceWithSpanProcessorWrapper("custom_service", map[string]string{"test1": "val1"}, wrapper,
versionInfoAttributes)
if err != nil {
log.Fatalf("Error while initializing service: %v", err)
}
Expand Down