From b2ef1f4e871375bbb686ffa04cf5872b396eb18f Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Thu, 25 Dec 2025 01:10:49 -0500 Subject: [PATCH 1/7] refactor: Replace SpanAttributes with semconv constants --- .../instrumentation/confluent_kafka/utils.py | 8 ++++-- .../tests/test_instrumentation.py | 26 +++++++++++-------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index fd8b72848d..fb0007e1e2 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -3,6 +3,10 @@ from opentelemetry import context, propagate from opentelemetry.propagators import textmap +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_MESSAGE_ID, + MESSAGING_SYSTEM, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, MessagingOperationValues, @@ -120,7 +124,7 @@ def _enrich_span( if not span.is_recording(): return - span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") + span.set_attribute(MESSAGING_SYSTEM, "kafka") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) if partition is not None: @@ -140,7 +144,7 @@ def _enrich_span( # A message within Kafka is uniquely defined by its topic name, topic partition and offset. if partition is not None and offset is not None and topic: span.set_attribute( - SpanAttributes.MESSAGING_MESSAGE_ID, + MESSAGING_MESSAGE_ID, f"{topic}.{partition}.{offset}", ) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 986116900d..2cda223f42 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -25,6 +25,10 @@ KafkaContextGetter, KafkaContextSetter, ) +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_MESSAGE_ID, + MESSAGING_SYSTEM, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, SpanAttributes, @@ -124,10 +128,10 @@ def test_poll(self) -> None: "attributes": { SpanAttributes.MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-10", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0", + MESSAGING_MESSAGE_ID: "topic-10.0.0", }, }, {"name": "recv", "attributes": {}}, @@ -136,10 +140,10 @@ def test_poll(self) -> None: "attributes": { SpanAttributes.MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 2, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-20", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4", + MESSAGING_MESSAGE_ID: "topic-20.2.4", }, }, {"name": "recv", "attributes": {}}, @@ -148,10 +152,10 @@ def test_poll(self) -> None: "attributes": { SpanAttributes.MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 1, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-30", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3", + MESSAGING_MESSAGE_ID: "topic-30.1.3", }, }, {"name": "recv", "attributes": {}}, @@ -191,7 +195,7 @@ def test_consume(self) -> None: "name": "topic-1 process", "attributes": { SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-1", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -201,7 +205,7 @@ def test_consume(self) -> None: "name": "topic-2 process", "attributes": { SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-2", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -211,7 +215,7 @@ def test_consume(self) -> None: "name": "topic-3 process", "attributes": { SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-3", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -249,10 +253,10 @@ def test_close(self) -> None: "attributes": { SpanAttributes.MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-a", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0", + MESSAGING_MESSAGE_ID: "topic-a.0.0", }, }, ] From 8526aa5068dd1322277c2e1268ffe3c51160251a Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Thu, 25 Dec 2025 01:18:40 -0500 Subject: [PATCH 2/7] update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 683f10a018..88fd41e76c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -122,6 +122,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3681](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3681)) - `opentelemetry-instrumentation-flask`: Fix exemplars generation for `http.server.request.duration` and `http.server.duration` metrics ([#3912](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3912)) +- `opentelemetry-instrumentation-confluent-kafka`: Replace SpanAttributes with semconv constants where applicable + ([#4057](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4057)) ### Added From 768e70f3b178f31693591f40149c8ab37e9ffe40 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 29 Dec 2025 12:57:39 -0500 Subject: [PATCH 3/7] fix CHANGELOG.md --- CHANGELOG.md | 4 ++-- .../tests/test_instrumentation.py | 15 ++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88fd41e76c..962b065f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4058](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4058)) - `opentelemetry-instrumentation-django`: Replace SpanAttributes with semconv constants where applicable ([#4059](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4059)) +- `opentelemetry-instrumentation-confluent-kafka`: Replace SpanAttributes with semconv constants where applicable + ([#4057](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4057)) ## Version 1.39.0/0.60b0 (2025-12-03) @@ -122,8 +124,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3681](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3681)) - `opentelemetry-instrumentation-flask`: Fix exemplars generation for `http.server.request.duration` and `http.server.duration` metrics ([#3912](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3912)) -- `opentelemetry-instrumentation-confluent-kafka`: Replace SpanAttributes with semconv constants where applicable - ([#4057](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4057)) ### Added diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 2cda223f42..725f73cc2c 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -27,6 +27,7 @@ ) from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, MESSAGING_SYSTEM, ) from opentelemetry.semconv.trace import ( @@ -126,7 +127,7 @@ def test_poll(self) -> None: { "name": "topic-10 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-10", @@ -138,7 +139,7 @@ def test_poll(self) -> None: { "name": "topic-20 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 2, MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-20", @@ -150,7 +151,7 @@ def test_poll(self) -> None: { "name": "topic-30 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 1, MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-30", @@ -194,7 +195,7 @@ def test_consume(self) -> None: { "name": "topic-1 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-1", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, @@ -204,7 +205,7 @@ def test_consume(self) -> None: { "name": "topic-2 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-2", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, @@ -214,7 +215,7 @@ def test_consume(self) -> None: { "name": "topic-3 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-3", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, @@ -251,7 +252,7 @@ def test_close(self) -> None: { "name": "topic-a process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", + MESSAGING_OPERATION: "process", SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-a", From 4f29d4067a9e9ffded6f5e02fb7785866d88b9a0 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 30 Dec 2025 09:33:34 +0100 Subject: [PATCH 4/7] Update utils.py --- .../opentelemetry/instrumentation/confluent_kafka/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index fb0007e1e2..ad6a2923a9 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -6,10 +6,11 @@ from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( MESSAGING_MESSAGE_ID, MESSAGING_SYSTEM, + MESSAGING_OPERATION, + MessagingOperationTypeValues, ) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, - MessagingOperationValues, SpanAttributes, ) from opentelemetry.trace import Link, SpanKind @@ -119,7 +120,7 @@ def _enrich_span( topic, partition: Optional[int] = None, offset: Optional[int] = None, - operation: Optional[MessagingOperationValues] = None, + operation: Optional[MessagingOperationTypeValues] = None, ): if not span.is_recording(): return @@ -136,7 +137,7 @@ def _enrich_span( ) if operation: - span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + span.set_attribute(MESSAGING_OPERATION, operation.value) else: span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) From 25bb3266d199d2ae96dfe1c97cbed35bd3110990 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 30 Dec 2025 09:35:31 +0100 Subject: [PATCH 5/7] Update __init__.py --- .../instrumentation/confluent_kafka/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index db422db7f6..0ee416b571 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -110,7 +110,7 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from opentelemetry import context, propagate, trace from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap -from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.semconv._incubating.attributes.messaging_attributes import MessagingOperationTypeValues from opentelemetry.trace import Tracer from .package import _instruments @@ -363,7 +363,7 @@ def wrap_produce(func, instance, tracer, args, kwargs): _enrich_span( span, topic, - operation=MessagingOperationValues.RECEIVE, + operation=MessagingOperationTypeValues.RECEIVE, ) # Replace propagate.inject( headers, @@ -387,7 +387,7 @@ def wrap_poll(func, instance, tracer, args, kwargs): record.topic(), record.partition(), record.offset(), - operation=MessagingOperationValues.PROCESS, + operation=MessagingOperationTypeValues.PROCESS, ) instance._current_context_token = context.attach( trace.set_span_in_context(instance._current_consume_span) @@ -409,7 +409,7 @@ def wrap_consume(func, instance, tracer, args, kwargs): _enrich_span( instance._current_consume_span, records[0].topic(), - operation=MessagingOperationValues.PROCESS, + operation=MessagingOperationTypeValues.PROCESS, ) instance._current_context_token = context.attach( From 6e35789d7a91f91bef58bda3e5188154d9100780 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 30 Dec 2025 09:42:48 +0100 Subject: [PATCH 6/7] Update utils.py --- .../src/opentelemetry/instrumentation/confluent_kafka/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index ad6a2923a9..f7b5c059bb 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -5,8 +5,8 @@ from opentelemetry.propagators import textmap from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( MESSAGING_MESSAGE_ID, - MESSAGING_SYSTEM, MESSAGING_OPERATION, + MESSAGING_SYSTEM, MessagingOperationTypeValues, ) from opentelemetry.semconv.trace import ( From c170d13079033acfffacc4e5c00d1b2e4a6c8c11 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 30 Dec 2025 09:46:47 +0100 Subject: [PATCH 7/7] Update __init__.py --- .../opentelemetry/instrumentation/confluent_kafka/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 0ee416b571..60f1b26466 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -110,7 +110,9 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from opentelemetry import context, propagate, trace from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap -from opentelemetry.semconv._incubating.attributes.messaging_attributes import MessagingOperationTypeValues +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MessagingOperationTypeValues, +) from opentelemetry.trace import Tracer from .package import _instruments