From 8839242d284d961138f8fb0949bf6166eb6da619 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Fri, 3 Mar 2023 12:50:53 -0800 Subject: [PATCH 1/2] chore: make it easier to pass resource agent name and version This will make it easier for consumers of this module to pass thheir own agent names and versions. --- instrumentation/opentelemetry/init.go | 34 ++++++++++++------- .../opentelemetry/init_additional.go | 2 +- instrumentation/opentelemetry/init_test.go | 5 +-- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index 40cfe1b..d05f5ce 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -208,12 +208,13 @@ func createCaCertPoolFromFile(certFile string) *x509.CertPool { // Init initializes opentelemetry tracing and returns a shutdown function to flush data immediately // on a termination signal. func Init(cfg *config.AgentConfig) func() { - return InitWithSpanProcessorWrapper(cfg, nil) + return InitWithSpanProcessorWrapper(cfg, nil, versionInfoResourceAttributes) } // InitWithSpanProcessorWrapper initializes opentelemetry tracing with a wrapper over span processor // and returns a shutdown function to flush data immediately on a termination signal. -func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper) func() { +func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper, + versionInfoGetter func() []attribute.KeyValue) func() { mu.Lock() defer mu.Unlock() if initialized { @@ -237,7 +238,7 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor } // Initialize metrics - metricsShutdownFn := initializeMetrics(cfg) + metricsShutdownFn := initializeMetrics(cfg, versionInfoGetter) exporterFactory = makeExporterFactory(cfg) @@ -256,7 +257,8 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor resources, err := resource.New( context.Background(), - resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes)...), + resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, + versionInfoGetter)...), ) if err != nil { log.Fatal(err) @@ -310,29 +312,37 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor } } -func createResources(serviceName string, resources map[string]string) []attribute.KeyValue { +func createResources(serviceName string, resources map[string]string, + versionInfoGetter func() []attribute.KeyValue) []attribute.KeyValue { retValues := []attribute.KeyValue{ semconv.ServiceNameKey.String(serviceName), - semconv.TelemetrySDKNameKey.String("hypertrace"), - semconv.TelemetrySDKVersionKey.String(version.Version), semconv.TelemetrySDKLanguageGo, } + retValues = append(retValues, versionInfoGetter()...) + for k, v := range resources { retValues = append(retValues, attribute.String(k, v)) } return retValues } +func versionInfoResourceAttributes() []attribute.KeyValue { + return []attribute.KeyValue{ + semconv.TelemetrySDKNameKey.String("hypertrace"), + semconv.TelemetrySDKVersionKey.String(version.Version), + } +} + // RegisterService creates tracerprovider for a new service and returns a func which can be used to create spans and the TracerProvider func RegisterService(serviceName string, resourceAttributes map[string]string) (sdk.StartSpan, trace.TracerProvider, error) { - return RegisterServiceWithSpanProcessorWrapper(serviceName, resourceAttributes, nil) + return RegisterServiceWithSpanProcessorWrapper(serviceName, resourceAttributes, nil, versionInfoResourceAttributes) } // RegisterServiceWithSpanProcessorWrapper creates a tracerprovider for a new service with a wrapper over opentelemetry span processor // and returns a func which can be used to create spans and the TracerProvider func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttributes map[string]string, - wrapper SpanProcessorWrapper) (sdk.StartSpan, trace.TracerProvider, error) { + wrapper SpanProcessorWrapper, versionInfoGetter func() []attribute.KeyValue) (sdk.StartSpan, trace.TracerProvider, error) { mu.Lock() defer mu.Unlock() if !initialized { @@ -362,7 +372,7 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu resources, err := resource.New( context.Background(), - resource.WithAttributes(createResources(serviceName, resourceAttributes)...), + resource.WithAttributes(createResources(serviceName, resourceAttributes, versionInfoGetter)...), ) if err != nil { log.Fatal(err) @@ -379,7 +389,7 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu }), tp, nil } -func initializeMetrics(cfg *config.AgentConfig) func() { +func initializeMetrics(cfg *config.AgentConfig, versionInfoGetter func() []attribute.KeyValue) func() { if shouldDisableMetrics(cfg) { return func() {} } @@ -391,7 +401,7 @@ func initializeMetrics(cfg *config.AgentConfig) func() { } periodicReader := metric.NewPeriodicReader(metricsExporter) - resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes) + resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoGetter) resourceKvps = append(resourceKvps, identifier.ServiceInstanceKeyValue) metricResources, err := resource.New(context.Background(), resource.WithAttributes(resourceKvps...)) if err != nil { diff --git a/instrumentation/opentelemetry/init_additional.go b/instrumentation/opentelemetry/init_additional.go index ed55b9b..90fa202 100644 --- a/instrumentation/opentelemetry/init_additional.go +++ b/instrumentation/opentelemetry/init_additional.go @@ -34,7 +34,7 @@ func InitAsAdditional(cfg *config.AgentConfig) (trace.SpanProcessor, func()) { if cfg.GetServiceName().GetValue() != "" { resource, err := resource.New( context.Background(), - resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes)...), + resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoResourceAttributes)...), ) if err != nil { log.Fatal(err) diff --git a/instrumentation/opentelemetry/init_test.go b/instrumentation/opentelemetry/init_test.go index 51a6ff2..d706bcd 100644 --- a/instrumentation/opentelemetry/init_test.go +++ b/instrumentation/opentelemetry/init_test.go @@ -365,7 +365,7 @@ func TestInitWithSpanProcessorWrapper(t *testing.T) { cfg.Reporting.Endpoint = config.String(srv.URL) wrapper := &mockSpanProcessorWrapper{} - shutdown := InitWithSpanProcessorWrapper(cfg, wrapper) + shutdown := InitWithSpanProcessorWrapper(cfg, wrapper, versionInfoResourceAttributes) defer shutdown() // test wrapper is called for spans created by global trace provider @@ -380,7 +380,8 @@ func TestInitWithSpanProcessorWrapper(t *testing.T) { assert.Equal(t, 2, wrapper.onEndCount) // test wrapper is called for spans created by service trace provider - startSpan, _, err := RegisterServiceWithSpanProcessorWrapper("custom_service", map[string]string{"test1": "val1"}, wrapper) + startSpan, _, err := RegisterServiceWithSpanProcessorWrapper("custom_service", map[string]string{"test1": "val1"}, wrapper, + versionInfoResourceAttributes) if err != nil { log.Fatalf("Error while initializing service: %v", err) } From 1f159185ae39d5b463f092367e72627f4d518401 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Mon, 6 Mar 2023 06:33:13 -0800 Subject: [PATCH 2/2] use an attribute array instead of a function --- instrumentation/opentelemetry/init.go | 48 +++++++++---------- .../opentelemetry/init_additional.go | 2 +- instrumentation/opentelemetry/init_test.go | 4 +- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index d05f5ce..3fe20a5 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -39,15 +39,18 @@ import ( "google.golang.org/grpc/resolver" ) -var batchTimeout = time.Duration(200) * time.Millisecond - var ( - traceProviders map[string]*sdktrace.TracerProvider - globalSampler sdktrace.Sampler - initialized = false - enabled = false - mu sync.Mutex - exporterFactory func() (sdktrace.SpanExporter, error) + batchTimeout = time.Duration(200) * time.Millisecond + traceProviders map[string]*sdktrace.TracerProvider + globalSampler sdktrace.Sampler + initialized = false + enabled = false + mu sync.Mutex + exporterFactory func() (sdktrace.SpanExporter, error) + versionInfoAttributes = []attribute.KeyValue{ + semconv.TelemetrySDKNameKey.String("hypertrace"), + semconv.TelemetrySDKVersionKey.String(version.Version), + } ) func makePropagator(formats []config.PropagationFormat) propagation.TextMapPropagator { @@ -208,13 +211,13 @@ func createCaCertPoolFromFile(certFile string) *x509.CertPool { // Init initializes opentelemetry tracing and returns a shutdown function to flush data immediately // on a termination signal. func Init(cfg *config.AgentConfig) func() { - return InitWithSpanProcessorWrapper(cfg, nil, versionInfoResourceAttributes) + return InitWithSpanProcessorWrapper(cfg, nil, versionInfoAttributes) } // InitWithSpanProcessorWrapper initializes opentelemetry tracing with a wrapper over span processor // and returns a shutdown function to flush data immediately on a termination signal. func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper, - versionInfoGetter func() []attribute.KeyValue) func() { + versionInfoAttrs []attribute.KeyValue) func() { mu.Lock() defer mu.Unlock() if initialized { @@ -238,7 +241,7 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor } // Initialize metrics - metricsShutdownFn := initializeMetrics(cfg, versionInfoGetter) + metricsShutdownFn := initializeMetrics(cfg, versionInfoAttrs) exporterFactory = makeExporterFactory(cfg) @@ -258,7 +261,7 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor resources, err := resource.New( context.Background(), resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, - versionInfoGetter)...), + versionInfoAttrs)...), ) if err != nil { log.Fatal(err) @@ -313,13 +316,13 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor } func createResources(serviceName string, resources map[string]string, - versionInfoGetter func() []attribute.KeyValue) []attribute.KeyValue { + versionInfo []attribute.KeyValue) []attribute.KeyValue { retValues := []attribute.KeyValue{ semconv.ServiceNameKey.String(serviceName), semconv.TelemetrySDKLanguageGo, } - retValues = append(retValues, versionInfoGetter()...) + retValues = append(retValues, versionInfo...) for k, v := range resources { retValues = append(retValues, attribute.String(k, v)) @@ -327,22 +330,15 @@ func createResources(serviceName string, resources map[string]string, return retValues } -func versionInfoResourceAttributes() []attribute.KeyValue { - return []attribute.KeyValue{ - semconv.TelemetrySDKNameKey.String("hypertrace"), - semconv.TelemetrySDKVersionKey.String(version.Version), - } -} - // RegisterService creates tracerprovider for a new service and returns a func which can be used to create spans and the TracerProvider func RegisterService(serviceName string, resourceAttributes map[string]string) (sdk.StartSpan, trace.TracerProvider, error) { - return RegisterServiceWithSpanProcessorWrapper(serviceName, resourceAttributes, nil, versionInfoResourceAttributes) + return RegisterServiceWithSpanProcessorWrapper(serviceName, resourceAttributes, nil, versionInfoAttributes) } // RegisterServiceWithSpanProcessorWrapper creates a tracerprovider for a new service with a wrapper over opentelemetry span processor // and returns a func which can be used to create spans and the TracerProvider func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttributes map[string]string, - wrapper SpanProcessorWrapper, versionInfoGetter func() []attribute.KeyValue) (sdk.StartSpan, trace.TracerProvider, error) { + wrapper SpanProcessorWrapper, versionInfoAttrs []attribute.KeyValue) (sdk.StartSpan, trace.TracerProvider, error) { mu.Lock() defer mu.Unlock() if !initialized { @@ -372,7 +368,7 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu resources, err := resource.New( context.Background(), - resource.WithAttributes(createResources(serviceName, resourceAttributes, versionInfoGetter)...), + resource.WithAttributes(createResources(serviceName, resourceAttributes, versionInfoAttrs)...), ) if err != nil { log.Fatal(err) @@ -389,7 +385,7 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu }), tp, nil } -func initializeMetrics(cfg *config.AgentConfig, versionInfoGetter func() []attribute.KeyValue) func() { +func initializeMetrics(cfg *config.AgentConfig, versionInfoAttrs []attribute.KeyValue) func() { if shouldDisableMetrics(cfg) { return func() {} } @@ -401,7 +397,7 @@ func initializeMetrics(cfg *config.AgentConfig, versionInfoGetter func() []attri } periodicReader := metric.NewPeriodicReader(metricsExporter) - resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoGetter) + resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoAttrs) resourceKvps = append(resourceKvps, identifier.ServiceInstanceKeyValue) metricResources, err := resource.New(context.Background(), resource.WithAttributes(resourceKvps...)) if err != nil { diff --git a/instrumentation/opentelemetry/init_additional.go b/instrumentation/opentelemetry/init_additional.go index 90fa202..838167e 100644 --- a/instrumentation/opentelemetry/init_additional.go +++ b/instrumentation/opentelemetry/init_additional.go @@ -34,7 +34,7 @@ func InitAsAdditional(cfg *config.AgentConfig) (trace.SpanProcessor, func()) { if cfg.GetServiceName().GetValue() != "" { resource, err := resource.New( context.Background(), - resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoResourceAttributes)...), + resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoAttributes)...), ) if err != nil { log.Fatal(err) diff --git a/instrumentation/opentelemetry/init_test.go b/instrumentation/opentelemetry/init_test.go index d706bcd..cdbb194 100644 --- a/instrumentation/opentelemetry/init_test.go +++ b/instrumentation/opentelemetry/init_test.go @@ -365,7 +365,7 @@ func TestInitWithSpanProcessorWrapper(t *testing.T) { cfg.Reporting.Endpoint = config.String(srv.URL) wrapper := &mockSpanProcessorWrapper{} - shutdown := InitWithSpanProcessorWrapper(cfg, wrapper, versionInfoResourceAttributes) + shutdown := InitWithSpanProcessorWrapper(cfg, wrapper, versionInfoAttributes) defer shutdown() // test wrapper is called for spans created by global trace provider @@ -381,7 +381,7 @@ func TestInitWithSpanProcessorWrapper(t *testing.T) { // test wrapper is called for spans created by service trace provider startSpan, _, err := RegisterServiceWithSpanProcessorWrapper("custom_service", map[string]string{"test1": "val1"}, wrapper, - versionInfoResourceAttributes) + versionInfoAttributes) if err != nil { log.Fatalf("Error while initializing service: %v", err) }