diff --git a/CHANGELOG.md b/CHANGELOG.md index 683f10a018..962b065f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4058](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4058)) - `opentelemetry-instrumentation-django`: Replace SpanAttributes with semconv constants where applicable ([#4059](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4059)) +- `opentelemetry-instrumentation-confluent-kafka`: Replace SpanAttributes with semconv constants where applicable + ([#4057](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4057)) ## Version 1.39.0/0.60b0 (2025-12-03) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index db422db7f6..60f1b26466 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -110,7 +110,9 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from opentelemetry import context, propagate, trace from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap -from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MessagingOperationTypeValues, +) from opentelemetry.trace import Tracer from .package import _instruments @@ -363,7 +365,7 @@ def wrap_produce(func, instance, tracer, args, kwargs): _enrich_span( span, topic, - operation=MessagingOperationValues.RECEIVE, + operation=MessagingOperationTypeValues.RECEIVE, ) # Replace propagate.inject( headers, @@ -387,7 +389,7 @@ def wrap_poll(func, instance, tracer, args, kwargs): record.topic(), record.partition(), record.offset(), - operation=MessagingOperationValues.PROCESS, + operation=MessagingOperationTypeValues.PROCESS, ) instance._current_context_token = context.attach( trace.set_span_in_context(instance._current_consume_span) @@ -409,7 +411,7 @@ def wrap_consume(func, instance, tracer, args, kwargs): _enrich_span( instance._current_consume_span, records[0].topic(), - operation=MessagingOperationValues.PROCESS, + operation=MessagingOperationTypeValues.PROCESS, ) instance._current_context_token = context.attach( diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index fd8b72848d..f7b5c059bb 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -3,9 +3,14 @@ from opentelemetry import context, propagate from opentelemetry.propagators import textmap +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, + MessagingOperationTypeValues, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, - MessagingOperationValues, SpanAttributes, ) from opentelemetry.trace import Link, SpanKind @@ -115,12 +120,12 @@ def _enrich_span( topic, partition: Optional[int] = None, offset: Optional[int] = None, - operation: Optional[MessagingOperationValues] = None, + operation: Optional[MessagingOperationTypeValues] = None, ): if not span.is_recording(): return - span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") + span.set_attribute(MESSAGING_SYSTEM, "kafka") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) if partition is not None: @@ -132,7 +137,7 @@ def _enrich_span( ) if operation: - span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + span.set_attribute(MESSAGING_OPERATION, operation.value) else: span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) @@ -140,7 +145,7 @@ def _enrich_span( # A message within Kafka is uniquely defined by its topic name, topic partition and offset. if partition is not None and offset is not None and topic: span.set_attribute( - SpanAttributes.MESSAGING_MESSAGE_ID, + MESSAGING_MESSAGE_ID, f"{topic}.{partition}.{offset}", ) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 986116900d..725f73cc2c 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -25,6 +25,11 @@ KafkaContextGetter, KafkaContextSetter, ) +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, SpanAttributes, @@ -122,36 +127,36 @@ def test_poll(self) -> None: { "name": "topic-10 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-10", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0", + MESSAGING_MESSAGE_ID: "topic-10.0.0", }, }, {"name": "recv", "attributes": {}}, { "name": "topic-20 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 2, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-20", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4", + MESSAGING_MESSAGE_ID: "topic-20.2.4", }, }, {"name": "recv", "attributes": {}}, { "name": "topic-30 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 1, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-30", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3", + MESSAGING_MESSAGE_ID: "topic-30.1.3", }, }, {"name": "recv", "attributes": {}}, @@ -190,8 +195,8 @@ def test_consume(self) -> None: { "name": "topic-1 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-1", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -200,8 +205,8 @@ def test_consume(self) -> None: { "name": "topic-2 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-2", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -210,8 +215,8 @@ def test_consume(self) -> None: { "name": "topic-3 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-3", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -247,12 +252,12 @@ def test_close(self) -> None: { "name": "topic-a process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-a", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0", + MESSAGING_MESSAGE_ID: "topic-a.0.0", }, }, ]