diff --git a/.circleci/config.yml b/.circleci/config.yml index 611a2433..71711fb1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -52,6 +52,9 @@ commands: gevent: default: "" type: string + kafka: + default: "" + type: string tests: default: "tests" type: string @@ -61,6 +64,7 @@ commands: environment: CASSANDRA_TEST: "<>" GEVENT_STARLETTE_TEST: "<>" + KAFKA_TEST: "<>" command: | . venv/bin/activate coverage run --source=instana -m pytest -v --junitxml=test-results <> @@ -136,15 +140,6 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic - - image: public.ecr.aws/bitnami/kafka:3.9.0 - environment: - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: controller,broker - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -173,15 +168,6 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic - - image: public.ecr.aws/bitnami/kafka:3.9.0 - environment: - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: controller,broker - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -210,15 +196,6 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic - - image: public.ecr.aws/bitnami/kafka:3.9.0 - environment: - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: controller,broker - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -248,15 +225,6 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic - - image: public.ecr.aws/bitnami/kafka:3.9.0 - environment: - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: controller,broker - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -286,15 +254,6 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic - - image: public.ecr.aws/bitnami/kafka:3.9.0 - environment: - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: controller,broker - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -338,15 +297,6 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic - - image: public.ecr.aws/bitnami/kafka:3.9.0 - environment: - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: controller,broker - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -376,15 +326,6 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic - - image: public.ecr.aws/bitnami/kafka:3.9.0 - environment: - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: controller,broker - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -443,6 +384,30 @@ jobs: - store-pytest-results - store-coverage-report + py312kafka: + docker: + - image: public.ecr.aws/docker/library/python:3.12 + - image: public.ecr.aws/bitnami/kafka:3.9.0 + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 + working_directory: ~/repo + steps: + - checkout + - check-if-tests-needed + - pip-install-deps: + requirements: "tests/requirements-kafka.txt" + - run-tests-with-coverage-report: + kafka: "true" + tests: "tests/clients/kafka/test*.py" + - store-pytest-results + - store-coverage-report + workflows: version: 2 build: @@ -457,6 +422,7 @@ workflows: - py39cassandra - py39gevent_starlette - py312aws + - py312kafka - final_job: requires: - python38 diff --git a/.tekton/pipeline.yaml b/.tekton/pipeline.yaml index 7ee73dc0..0191b203 100644 --- a/.tekton/pipeline.yaml +++ b/.tekton/pipeline.yaml @@ -87,3 +87,17 @@ spec: workspaces: - name: task-pvc workspace: python-tracer-ci-pipeline-pvc + - name: unittest-kafka + runAfter: + - clone + matrix: + params: + - name: imageDigest + value: + # public.ecr.aws/docker/library/python:3.12.9-bookworm + - "sha256:ae24158f83adcb3ec1dead14356e6debc9f3125167624408d95338faacc5cce3" + taskRef: + name: python-tracer-unittest-kafka-task + workspaces: + - name: task-pvc + workspace: python-tracer-ci-pipeline-pvc diff --git a/.tekton/run_unittests.sh b/.tekton/run_unittests.sh index 699116ca..d4e5103d 100755 --- a/.tekton/run_unittests.sh +++ b/.tekton/run_unittests.sh @@ -32,9 +32,13 @@ gevent_starlette) aws) export REQUIREMENTS='requirements.txt' export TESTS=('tests_aws') ;; +kafka) + export REQUIREMENTS='requirements-kafka.txt' + export TESTS=('tests/clients/kafka') + export KAFKA_TEST='true' ;; *) echo "ERROR \$TEST_CONFIGURATION='${TEST_CONFIGURATION}' is unsupported " \ - "not in (default|cassandra|gevent_starlette|aws)" >&2 + "not in (default|cassandra|gevent_starlette|aws|kafka)" >&2 exit 3 ;; esac diff --git a/.tekton/task.yaml b/.tekton/task.yaml index 9f07ad0c..b68593bf 100644 --- a/.tekton/task.yaml +++ b/.tekton/task.yaml @@ -131,25 +131,6 @@ spec: - name: rabbitmq # public.ecr.aws/docker/library/rabbitmq:3.13.0 image: public.ecr.aws/docker/library/rabbitmq@sha256:39de1a4fc6c72d12bd5dfa23e8576536fd1c0cc8418344cd5a51addfc9a1145d - - name: kafka - # public.ecr.aws/bitnami/kafka:3.9.0 - image: public.ecr.aws/bitnami/kafka@sha256:d2890d68f96b36da3c8413fa94294f018b2f95d87cf108cbf71eab510572d9be - env: - - name: KAFKA_CFG_NODE_ID - value: "0" - - name: KAFKA_CFG_PROCESS_ROLES - value: "controller,broker" - - name: KAFKA_CFG_LISTENERS - value: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094" - - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP - value: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT" - - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS - value: "0@kafka:9093" - - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES - value: "CONTROLLER" - - name: KAFKA_CFG_ADVERTISED_LISTENERS - value: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094" - params: - name: imageDigest type: string @@ -186,3 +167,43 @@ spec: workingDir: /workspace/python-sensor/ command: - /workspace/python-sensor/.tekton/run_unittests.sh +--- +apiVersion: tekton.dev/v1 +kind: Task +metadata: + name: python-tracer-unittest-kafka-task +spec: + sidecars: + - name: kafka + # public.ecr.aws/bitnami/kafka:3.9.0 + image: public.ecr.aws/bitnami/kafka@sha256:d2890d68f96b36da3c8413fa94294f018b2f95d87cf108cbf71eab510572d9be + env: + - name: KAFKA_CFG_NODE_ID + value: "0" + - name: KAFKA_CFG_PROCESS_ROLES + value: "controller,broker" + - name: KAFKA_CFG_LISTENERS + value: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094" + - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP + value: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT" + - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS + value: "0@kafka:9093" + - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES + value: "CONTROLLER" + - name: KAFKA_CFG_ADVERTISED_LISTENERS + value: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094" + params: + - name: imageDigest + type: string + workspaces: + - name: task-pvc + mountPath: /workspace + steps: + - name: unittest + image: public.ecr.aws/docker/library/python@$(params.imageDigest) + env: + - name: TEST_CONFIGURATION + value: kafka + workingDir: /workspace/python-sensor/ + command: + - /workspace/python-sensor/.tekton/run_unittests.sh diff --git a/src/instana/__init__.py b/src/instana/__init__.py index 8c111576..5f26bc5a 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -205,6 +205,7 @@ def boot_agent() -> None: storage, # noqa: F401 ) from instana.instrumentation.kafka import ( + confluent_kafka_python, # noqa: F401 kafka_python, # noqa: F401 ) from instana.instrumentation.tornado import ( diff --git a/src/instana/instrumentation/kafka/confluent_kafka_python.py b/src/instana/instrumentation/kafka/confluent_kafka_python.py new file mode 100644 index 00000000..1eef59c9 --- /dev/null +++ b/src/instana/instrumentation/kafka/confluent_kafka_python.py @@ -0,0 +1,164 @@ +# (c) Copyright IBM Corp. 2025 + +try: + from typing import Any, Callable, Dict, List, Optional, Tuple + + import confluent_kafka # noqa: F401 + import wrapt + from confluent_kafka import Consumer, Producer + from opentelemetry.trace import SpanKind + + from instana.log import logger + from instana.propagators.format import Format + from instana.util.traceutils import ( + get_tracer_tuple, + tracing_is_off, + ) + + # As confluent_kafka is a wrapper around the C-developed librdkafka + # (provided automatically via binary wheels), we have to create new classes + # inheriting from the confluent_kafka package with the methods to be + # monkey-patched. + class InstanaConfluentKafkaProducer(Producer): + """ + Wrapper class for confluent_kafka.Producer, which is an Asynchronous Kafka Producer. + """ + + def produce( + self, + topic: str, + *args: object, + **kwargs: Dict[str, Any], + ) -> None: + return super().produce(topic, *args, **kwargs) + + class InstanaConfluentKafkaConsumer(Consumer): + """ + Wrapper class for confluent_kafka.Consumer, which is a high-level Apache Kafka consumer. + """ + + def consume( + self, *args: object, **kwargs: Dict[str, Any] + ) -> List[confluent_kafka.Message]: + return super().consume(*args, **kwargs) + + def poll( + self, timeout: Optional[float] = -1 + ) -> Optional[confluent_kafka.Message]: + return super().poll(timeout) + + def trace_kafka_produce( + wrapped: Callable[..., InstanaConfluentKafkaProducer.produce], + instance: InstanaConfluentKafkaProducer, + args: Tuple[int, str, Tuple[Any, ...]], + kwargs: Dict[str, Any], + ) -> None: + if tracing_is_off(): + return wrapped(*args, **kwargs) + + tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None + + with tracer.start_as_current_span( + "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER + ) as span: + span.set_attribute("kafka.service", args[0]) + span.set_attribute("kafka.access", "produce") + + # context propagation + headers = args[6] if len(args) > 6 else kwargs.get("headers", {}) + tracer.inject( + span.context, + Format.KAFKA_HEADERS, + headers, + disable_w3c_trace_context=True, + ) + + try: + res = wrapped(*args, **kwargs) + except Exception as exc: + span.record_exception(exc) + else: + return res + + def trace_kafka_consume( + wrapped: Callable[..., InstanaConfluentKafkaConsumer.consume], + instance: InstanaConfluentKafkaConsumer, + args: Tuple[int, str, Tuple[Any, ...]], + kwargs: Dict[str, Any], + ) -> List[confluent_kafka.Message]: + if tracing_is_off(): + return wrapped(*args, **kwargs) + + tracer, parent_span, _ = get_tracer_tuple() + + parent_context = ( + parent_span.get_span_context() + if parent_span + else tracer.extract( + Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True + ) + ) + + with tracer.start_as_current_span( + "kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER + ) as span: + span.set_attribute("kafka.access", "consume") + + try: + res = wrapped(*args, **kwargs) + if isinstance(res, list) and len(res) > 0: + span.set_attribute("kafka.service", res[0].topic()) + except Exception as exc: + span.record_exception(exc) + else: + return res + + def trace_kafka_poll( + wrapped: Callable[..., InstanaConfluentKafkaConsumer.poll], + instance: InstanaConfluentKafkaConsumer, + args: Tuple[int, str, Tuple[Any, ...]], + kwargs: Dict[str, Any], + ) -> Optional[confluent_kafka.Message]: + if tracing_is_off(): + return wrapped(*args, **kwargs) + + tracer, parent_span, _ = get_tracer_tuple() + + parent_context = ( + parent_span.get_span_context() + if parent_span + else tracer.extract( + Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True + ) + ) + + with tracer.start_as_current_span( + "kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER + ) as span: + span.set_attribute("kafka.access", "poll") + + try: + res = wrapped(*args, **kwargs) + if res: + span.set_attribute("kafka.service", res.topic()) + except Exception as exc: + span.record_exception(exc) + else: + return res + + # Apply the monkey patch + confluent_kafka.Producer = InstanaConfluentKafkaProducer + confluent_kafka.Consumer = InstanaConfluentKafkaConsumer + + wrapt.wrap_function_wrapper( + InstanaConfluentKafkaProducer, "produce", trace_kafka_produce + ) + wrapt.wrap_function_wrapper( + InstanaConfluentKafkaConsumer, "consume", trace_kafka_consume + ) + wrapt.wrap_function_wrapper(InstanaConfluentKafkaConsumer, "poll", trace_kafka_poll) + + logger.debug("Instrumenting Kafka (confluent_kafka)") +except ImportError: + pass diff --git a/src/instana/span/registered_span.py b/src/instana/span/registered_span.py index a658f0b8..852cf8bd 100644 --- a/src/instana/span/registered_span.py +++ b/src/instana/span/registered_span.py @@ -372,3 +372,4 @@ def _collect_http_attributes(self, span: "InstanaSpan") -> None: def _collect_kafka_attributes(self, span: "InstanaSpan") -> None: self.data["kafka"]["service"] = span.attributes.pop("kafka.service", None) self.data["kafka"]["access"] = span.attributes.pop("kafka.access", None) + self.data["kafka"]["error"] = span.attributes.pop("kafka.error", None) diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py new file mode 100644 index 00000000..722b4611 --- /dev/null +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -0,0 +1,193 @@ +# (c) Copyright IBM Corp. 2025 + +from typing import Generator + +import pytest +from confluent_kafka import ( + Consumer, + KafkaException, + Producer, +) +from confluent_kafka.admin import AdminClient, NewTopic +from opentelemetry.trace import SpanKind + +from instana.singletons import agent, tracer +from tests.helpers import testenv + + +class TestConfluentKafka: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """SetUp and TearDown""" + # setup + # Clear all spans before a test run + self.recorder = tracer.span_processor + self.recorder.clear_spans() + + # Kafka admin client + self.kafka_config = {"bootstrap.servers": testenv["kafka_bootstrap_servers"][0]} + self.kafka_client = AdminClient(self.kafka_config) + + try: + topics = self.kafka_client.create_topics( # noqa: F841 + [ + NewTopic( + testenv["kafka_topic"], + num_partitions=1, + replication_factor=1, + ), + ] + ) + except KafkaException: + pass + + # Kafka producer + self.producer = Producer(self.kafka_config) + yield + # teardown + # Ensure that allow_exit_as_root has the default value""" + agent.options.allow_exit_as_root = False + # Close connections + self.kafka_client.delete_topics([testenv["kafka_topic"]]) + + def test_trace_confluent_kafka_produce(self) -> None: + with tracer.start_as_current_span("test"): + self.producer.produce(testenv["kafka_topic"], b"raw_bytes") + self.producer.flush(timeout=10) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "produce" + + def test_trace_confluent_kafka_consume(self) -> None: + # Produce some events + self.producer.produce(testenv["kafka_topic"], value=b"raw_bytes1") + self.producer.flush(timeout=30) + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + + with tracer.start_as_current_span("test"): + msgs = consumer.consume(num_messages=1, timeout=60) # noqa: F841 + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.SERVER + assert kafka_span.data["kafka"]["access"] == "consume" + + def test_trace_confluent_kafka_poll(self) -> None: + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush(timeout=30) + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + + with tracer.start_as_current_span("test"): + msg = consumer.poll(timeout=60) # noqa: F841 + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.SERVER + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "poll" + + def test_trace_confluent_kafka_error(self) -> None: + # Consume the events + consumer_config = {"bootstrap.servers": ["some_inexistent_host:9094"]} + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe(["inexistent_kafka_topic"]) + + with tracer.start_as_current_span("test"): + consumer.consume(-10) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[len(spans) - 1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert kafka_span.ec == 1 + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.SERVER + assert not kafka_span.data["kafka"]["service"] + assert kafka_span.data["kafka"]["access"] == "consume" + assert ( + kafka_span.data["kafka"]["error"] + == "num_messages must be between 0 and 1000000 (1M)" + ) diff --git a/tests/clients/test_kafka_python.py b/tests/clients/kafka/test_kafka_python.py similarity index 72% rename from tests/clients/test_kafka_python.py rename to tests/clients/kafka/test_kafka_python.py index 9c47b7ab..f5b9de1b 100644 --- a/tests/clients/test_kafka_python.py +++ b/tests/clients/kafka/test_kafka_python.py @@ -12,7 +12,7 @@ from tests.helpers import testenv -class TestKafkaPythonProducer: +class TestKafkaPython: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: """SetUp and TearDown""" @@ -53,7 +53,7 @@ def _resource(self) -> Generator[None, None, None]: self.kafka_client.delete_topics([testenv["kafka_topic"]]) self.kafka_client.close() - def test_trace_kafka_send(self) -> None: + def test_trace_kafka_python_send(self) -> None: with tracer.start_as_current_span("test"): future = self.producer.send(testenv["kafka_topic"], b"raw_bytes") @@ -80,7 +80,7 @@ def test_trace_kafka_send(self) -> None: assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "send" - def test_trace_kafka_consume(self) -> None: + def test_trace_kafka_python_consume(self) -> None: agent.options.allow_exit_as_root = False # Produce some events @@ -124,3 +124,44 @@ def test_trace_kafka_consume(self) -> None: assert kafka_span.k == SpanKind.SERVER assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "consume" + + def test_trace_kafka_python_error(self) -> None: + agent.options.allow_exit_as_root = False + + # Consume the events + consumer = KafkaConsumer( + "inexistent_kafka_topic", + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", # consume earliest available messages + enable_auto_commit=False, # do not auto-commit offsets + consumer_timeout_ms=1000, + ) + + with tracer.start_as_current_span("test"): + for msg in consumer: + if msg is None: + break + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert kafka_span.ec == 1 + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.SERVER + assert kafka_span.data["kafka"]["service"] == "inexistent_kafka_topic" + assert kafka_span.data["kafka"]["access"] == "consume" + assert kafka_span.data["kafka"]["error"] == "StopIteration()" diff --git a/tests/conftest.py b/tests/conftest.py index 775a6641..342be521 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,12 +17,12 @@ from instana.agent.host import HostAgent from instana.collector.base import BaseCollector +from instana.fsm import TheMachine from instana.recorder import StanRecorder from instana.span.base_span import BaseSpan from instana.span.span import InstanaSpan from instana.span_context import SpanContext from instana.tracer import InstanaTracerProvider -from instana.fsm import TheMachine collect_ignore_glob = [ "*test_gevent*", @@ -42,6 +42,8 @@ collect_ignore_glob.append("*test_gevent*") collect_ignore_glob.append("*test_starlette*") +if not os.environ.get("KAFKA_TEST"): + collect_ignore_glob.append("*kafka/test*") if sys.version_info >= (3, 13): # Currently not installable dependencies because of 3.13 incompatibilities diff --git a/tests/requirements-kafka.txt b/tests/requirements-kafka.txt new file mode 100644 index 00000000..845f4c7b --- /dev/null +++ b/tests/requirements-kafka.txt @@ -0,0 +1,6 @@ +coverage>=5.5 +mock>=2.0.0 +pytest +kafka-python>=2.0.0; python_version < "3.12" +kafka-python-ng>=2.0.0; python_version >= "3.12" +confluent-kafka>=2.0.0 \ No newline at end of file diff --git a/tests/requirements-pre314.txt b/tests/requirements-pre314.txt index c57055b7..8b365d88 100644 --- a/tests/requirements-pre314.txt +++ b/tests/requirements-pre314.txt @@ -41,4 +41,3 @@ tornado>=6.4.1 uvicorn>=0.13.4 urllib3>=1.26.5 httpx>=0.27.0 -kafka-python-ng>=2.0.0 diff --git a/tests/requirements.txt b/tests/requirements.txt index ad4fd0ed..9856462a 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -40,5 +40,3 @@ tornado>=6.4.1 uvicorn>=0.13.4 urllib3>=1.26.5 httpx>=0.27.0 -kafka-python>=2.0.0; python_version < "3.12" -kafka-python-ng>=2.0.0; python_version >= "3.12" \ No newline at end of file