Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -164,6 +173,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -192,6 +210,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -221,6 +248,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -250,6 +286,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -293,6 +338,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -322,6 +376,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down
4 changes: 4 additions & 0 deletions .tekton/python-tracer-prepuller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ spec:
# public.ecr.aws/docker/library/postgres:16.2-bookworm
image: public.ecr.aws/docker/library/postgres@sha256:07572430dbcd821f9f978899c3ab3a727f5029be9298a41662e1b5404d5b73e0
command: ["sh", "-c", "'true'"]
- name: prepuller-kafka
# public.ecr.aws/bitnami/kafka:3.9.0
image: public.ecr.aws/docker/library/kafka@sha256:d2890d68f96b36da3c8413fa94294f018b2f95d87cf108cbf71eab510572d9be
command: ["sh", "-c", "'true'"]
- name: prepuller-38
# public.ecr.aws/docker/library/python:3.8.20-bookworm
image: public.ecr.aws/docker/library/python@
Expand Down
19 changes: 19 additions & 0 deletions .tekton/task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ spec:
- name: rabbitmq
# public.ecr.aws/docker/library/rabbitmq:3.13.0
image: public.ecr.aws/docker/library/rabbitmq@sha256:39de1a4fc6c72d12bd5dfa23e8576536fd1c0cc8418344cd5a51addfc9a1145d
- name: kafka
# public.ecr.aws/bitnami/kafka:3.9.0
image: public.ecr.aws/bitnami/kafka@sha256:d2890d68f96b36da3c8413fa94294f018b2f95d87cf108cbf71eab510572d9be
env:
- name: KAFKA_CFG_NODE_ID
value: 0
- name: KAFKA_CFG_PROCESS_ROLES
value: controller,broker
- name: KAFKA_CFG_LISTENERS
value: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: 0@kafka:9093
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094

params:
- name: imageDigest
type: string
Expand Down
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,17 @@ services:
ports:
- "8681:8681"
- "8682:8682"

kafka:
image: public.ecr.aws/bitnami/kafka:latest
ports:
- '9092:9092'
- '9094:9094'
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
23 changes: 15 additions & 8 deletions src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,43 +169,50 @@ def boot_agent() -> None:
asyncio, # noqa: F401
boto3_inst, # noqa: F401
cassandra_inst, # noqa: F401
celery, # noqa: F401
couchbase_inst, # noqa: F401
fastapi_inst, # noqa: F401
flask, # noqa: F401
# gevent_inst, # noqa: F401
grpcio, # noqa: F401
logging, # noqa: F401
mysqlclient, # noqa: F401
pika, # noqa: F401
pep0249, # noqa: F401
pika, # noqa: F401
psycopg2, # noqa: F401
pymongo, # noqa: F401
pymysql, # noqa: F401
pyramid, # noqa: F401
redis, # noqa: F401
sanic_inst, # noqa: F401
sqlalchemy, # noqa: F401
starlette_inst, # noqa: F401
sanic_inst, # noqa: F401
urllib3, # noqa: F401
)
from instana.instrumentation.aiohttp import (
client, # noqa: F401
server, # noqa: F401
client as aiohttp_client, # noqa: F401
)
from instana.instrumentation.aiohttp import (
server as aiohttp_server, # noqa: F401
)
from instana.instrumentation.aws import lambda_inst # noqa: F401
from instana.instrumentation import celery # noqa: F401
from instana.instrumentation.django import middleware # noqa: F401
from instana.instrumentation.google.cloud import (
pubsub, # noqa: F401
storage, # noqa: F401
)
from instana.instrumentation.kafka import (
kafka_python, # noqa: F401
)
from instana.instrumentation.tornado import (
client as tornado_client, # noqa: F401
)
from instana.instrumentation.tornado import (
client, # noqa: F401
server, # noqa: F401
server as tornado_server, # noqa: F401
)

# Hooks
from instana.hooks import hook_uwsgi, hook_gunicorn # noqa: F401
from instana.hooks import hook_gunicorn, hook_uwsgi # noqa: F401


if "INSTANA_DISABLE" not in os.environ:
Expand Down
1 change: 1 addition & 0 deletions src/instana/instrumentation/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# (c) Copyright IBM Corp. 2025
90 changes: 90 additions & 0 deletions src/instana/instrumentation/kafka/kafka_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# (c) Copyright IBM Corp. 2025

try:
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple

import kafka # noqa: F401
import wrapt
from opentelemetry.trace import SpanKind

from instana.log import logger
from instana.propagators.format import Format
from instana.util.traceutils import (
get_tracer_tuple,
tracing_is_off,
)

if TYPE_CHECKING:
from kafka.producer.future import FutureRecordMetadata

@wrapt.patch_function_wrapper("kafka", "KafkaProducer.send")
def trace_kafka_send(
wrapped: Callable[..., "kafka.KafkaProducer.send"],
instance: "kafka.KafkaProducer",
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> "FutureRecordMetadata":
if tracing_is_off():
return wrapped(*args, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

with tracer.start_as_current_span(
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
) as span:
span.set_attribute("kafka.service", args[0])
span.set_attribute("kafka.access", "send")

# context propagation
tracer.inject(
span.context,
Format.KAFKA_HEADERS,
kwargs.get("headers", {}),
disable_w3c_trace_context=True,
)

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return res

@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.__next__")
def trace_kafka_consume(
wrapped: Callable[..., "kafka.KafkaConsumer.__next__"],
instance: "kafka.KafkaConsumer",
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> "FutureRecordMetadata":
if tracing_is_off():
return wrapped(*args, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()

parent_context = (
parent_span.get_span_context()
if parent_span
else tracer.extract(
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
)
)

with tracer.start_as_current_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
) as span:
topic = list(instance.subscription())[0]
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", "consume")

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return res

logger.debug("Instrumenting Kafka (kafka-python)")
except ImportError:
pass
12 changes: 12 additions & 0 deletions src/instana/propagators/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,15 @@ class Format(object):
should use a prefix or other convention to distinguish tracer-specific
key:value pairs.
"""

KAFKA_HEADERS = "kafka_headers"
"""
The KAFKA_HEADERS format represents :class:`SpanContext`\\ s in a python
``dict`` mapping from character-restricted strings to strings.

Keys and values in the KAFKA_HEADERS carrier must be suitable for use as
HTTP headers (without modification or further escaping). That is, the
keys have a greatly restricted character set, casing for the keys may not
be preserved by various intermediaries, and the values should be
URL-escaped.
"""
61 changes: 61 additions & 0 deletions src/instana/propagators/kafka_propagator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# (c) Copyright IBM Corp. 2025
from typing import TYPE_CHECKING

from opentelemetry.trace.span import format_span_id

from instana.log import logger
from instana.propagators.base_propagator import BasePropagator, CarrierT
from instana.util.ids import hex_id_limited

if TYPE_CHECKING:
from instana.span_context import SpanContext


class KafkaPropagator(BasePropagator):
"""
Instana Propagator for Format.KAFKA_HEADERS.

The KAFKA_HEADERS format deals with key-values with string to string mapping.
The character set should be restricted to HTTP compatible.
"""

def __init__(self) -> None:
super(KafkaPropagator, self).__init__()

def inject(
self,
span_context: "SpanContext",
carrier: CarrierT,
disable_w3c_trace_context: bool = True,
) -> None:
trace_id = span_context.trace_id
span_id = span_context.span_id
dictionary_carrier = self.extract_headers_dict(carrier)

if dictionary_carrier:
# Suppression `level` made in the child context or in the parent context
# has priority over any non-suppressed `level` setting
child_level = int(
self.extract_instana_headers(dictionary_carrier)[2] or "1"
)
span_context.level = min(child_level, span_context.level)

serializable_level = str(span_context.level)

def inject_key_value(carrier, key, value):
if isinstance(carrier, list):
carrier.append((key, value))
elif isinstance(carrier, dict) or "__setitem__" in dir(carrier):
carrier[key] = value
else:
raise Exception(
f"KafkaPropagator: Unsupported carrier type {type(carrier)}",
)

try:
inject_key_value(carrier, "X_INSTANA_L_S", serializable_level)
inject_key_value(carrier, "X_INSTANA_T", hex_id_limited(trace_id))
inject_key_value(carrier, "X_INSTANA_S", format_span_id(span_id))

except Exception:
logger.debug("KafkaPropagator - inject error:", exc_info=True)
Loading
Loading