diff --git a/modules/kafka/README.rst b/modules/kafka/README.rst index 144c0fc2a..a481c7870 100644 --- a/modules/kafka/README.rst +++ b/modules/kafka/README.rst @@ -1,2 +1,3 @@ .. autoclass:: testcontainers.kafka.KafkaContainer .. title:: testcontainers.kafka.KafkaContainer +.. autoclass:: testcontainers.redpanda.RedpandaContainer diff --git a/modules/kafka/testcontainers/kafka/__init__.py b/modules/kafka/testcontainers/kafka/__init__.py index 399839433..2d6b5f552 100644 --- a/modules/kafka/testcontainers/kafka/__init__.py +++ b/modules/kafka/testcontainers/kafka/__init__.py @@ -8,6 +8,7 @@ from testcontainers.core.container import DockerContainer from testcontainers.core.utils import raise_for_deprecated_parameter from testcontainers.core.waiting_utils import wait_container_is_ready +from testcontainers.kafka.redpanda import RedpandaContainer class KafkaContainer(DockerContainer): diff --git a/modules/kafka/testcontainers/kafka/redpanda.py b/modules/kafka/testcontainers/kafka/redpanda.py new file mode 100644 index 000000000..a800026a1 --- /dev/null +++ b/modules/kafka/testcontainers/kafka/redpanda.py @@ -0,0 +1,82 @@ +import tarfile +import time +from io import BytesIO +from textwrap import dedent + +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + + +class RedpandaContainer(DockerContainer): + """ + Redpanda container. + + Example: + + .. doctest:: + + >>> from testcontainers.redpanda import RedpandaContainer + + >>> with RedpandaContainer() as redpanda: + ... connection = redpanda.get_bootstrap_server() + """ + + TC_START_SCRIPT = "/tc-start.sh" + + def __init__( + self, + image: str = "docker.redpanda.com/redpandadata/redpanda:v23.1.13", + **kwargs, + ) -> None: + kwargs["entrypoint"] = "sh" + super(RedpandaContainer, self).__init__(image, **kwargs) + self.redpanda_port = 9092 + self.schema_registry_port = 8081 + self.with_exposed_ports(self.redpanda_port, self.schema_registry_port) + + def get_bootstrap_server(self) -> str: + host = self.get_container_host_ip() + port = self.get_exposed_port(self.redpanda_port) + return f"{host}:{port}" + + def get_schema_registry_address(self) -> str: + host = self.get_container_host_ip() + port = self.get_exposed_port(self.schema_registry_port) + return f"http://{host}:{port}" + + def tc_start(self) -> None: + host = self.get_container_host_ip() + port = self.get_exposed_port(self.redpanda_port) + + data = ( + dedent( + f""" + #!/bin/bash + /usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G \ + --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 \ + --advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://{host}:{port} + """ + ) + .strip() + .encode("utf-8") + ) + + self.create_file(data, RedpandaContainer.TC_START_SCRIPT) + + def start(self, timeout=10) -> "RedpandaContainer": + script = RedpandaContainer.TC_START_SCRIPT + command = f'-c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"' + self.with_command(command) + super().start() + self.tc_start() + wait_for_logs(self, r".*Started Kafka API server.*", timeout=timeout) + return self + + def create_file(self, content: bytes, path: str) -> None: + with BytesIO() as archive, tarfile.TarFile(fileobj=archive, mode="w") as tar: + tarinfo = tarfile.TarInfo(name=path) + tarinfo.size = len(content) + tarinfo.mtime = time.time() + tar.addfile(tarinfo, BytesIO(content)) + archive.seek(0) + self.get_wrapped_container().put_archive("/", archive) diff --git a/modules/kafka/tests/test_redpanda.py b/modules/kafka/tests/test_redpanda.py new file mode 100644 index 000000000..5f53b64b1 --- /dev/null +++ b/modules/kafka/tests/test_redpanda.py @@ -0,0 +1,56 @@ +import requests +import json +from kafka import KafkaConsumer, KafkaProducer, TopicPartition, KafkaAdminClient +from kafka.admin import NewTopic +from testcontainers.kafka import RedpandaContainer + + +def test_redpanda_producer_consumer(): + with RedpandaContainer() as container: + produce_and_consume_message(container) + + +def test_redpanda_confluent_latest(): + with RedpandaContainer( + image="docker.redpanda.com/redpandadata/redpanda:latest" + ) as container: + produce_and_consume_message(container) + + +def test_schema_registry(): + with RedpandaContainer() as container: + address = container.get_schema_registry_address() + subject_name = "test-subject-value" + url = f"{address}/subjects" + + payload = {"schema": json.dumps({"type": "string"})} + headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"} + create_result = requests.post( + f"{url}/{subject_name}/versions", data=json.dumps(payload), headers=headers + ) + assert create_result.status_code == 200 + + result = requests.get(url) + assert result.status_code == 200 + assert subject_name in result.json() + + +def produce_and_consume_message(container): + topic = "test-topic" + bootstrap_server = container.get_bootstrap_server() + + admin = KafkaAdminClient(bootstrap_servers=[bootstrap_server]) + admin.create_topics([NewTopic(topic, 1, 1)]) + + producer = KafkaProducer(bootstrap_servers=[bootstrap_server]) + future = producer.send(topic, b"verification message") + future.get(timeout=10) + producer.close() + + consumer = KafkaConsumer(bootstrap_servers=[bootstrap_server]) + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + consumer.seek_to_beginning() + assert ( + consumer.end_offsets([tp])[tp] == 1 + ), "Expected exactly one test message to be present on test topic !"