Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/kafka/README.rst
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.. autoclass:: testcontainers.kafka.KafkaContainer
.. title:: testcontainers.kafka.KafkaContainer
.. autoclass:: testcontainers.redpanda.RedpandaContainer
1 change: 1 addition & 0 deletions modules/kafka/testcontainers/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from testcontainers.core.container import DockerContainer
from testcontainers.core.utils import raise_for_deprecated_parameter
from testcontainers.core.waiting_utils import wait_container_is_ready
from testcontainers.kafka.redpanda import RedpandaContainer


class KafkaContainer(DockerContainer):
Expand Down
82 changes: 82 additions & 0 deletions modules/kafka/testcontainers/kafka/redpanda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import tarfile
import time
from io import BytesIO
from textwrap import dedent

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs


class RedpandaContainer(DockerContainer):
"""
Redpanda container.

Example:

.. doctest::

>>> from testcontainers.redpanda import RedpandaContainer

>>> with RedpandaContainer() as redpanda:
... connection = redpanda.get_bootstrap_server()
"""

TC_START_SCRIPT = "/tc-start.sh"

def __init__(
self,
image: str = "docker.redpanda.com/redpandadata/redpanda:v23.1.13",
**kwargs,
) -> None:
kwargs["entrypoint"] = "sh"
super(RedpandaContainer, self).__init__(image, **kwargs)
self.redpanda_port = 9092
self.schema_registry_port = 8081
self.with_exposed_ports(self.redpanda_port, self.schema_registry_port)

def get_bootstrap_server(self) -> str:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.redpanda_port)
return f"{host}:{port}"

def get_schema_registry_address(self) -> str:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.schema_registry_port)
return f"http://{host}:{port}"

def tc_start(self) -> None:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.redpanda_port)

data = (
dedent(
f"""
#!/bin/bash
/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G \
--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 \
--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://{host}:{port}
"""
)
.strip()
.encode("utf-8")
)

self.create_file(data, RedpandaContainer.TC_START_SCRIPT)

def start(self, timeout=10) -> "RedpandaContainer":
script = RedpandaContainer.TC_START_SCRIPT
command = f'-c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
self.with_command(command)
super().start()
self.tc_start()
wait_for_logs(self, r".*Started Kafka API server.*", timeout=timeout)
return self

def create_file(self, content: bytes, path: str) -> None:
with BytesIO() as archive, tarfile.TarFile(fileobj=archive, mode="w") as tar:
tarinfo = tarfile.TarInfo(name=path)
tarinfo.size = len(content)
tarinfo.mtime = time.time()
tar.addfile(tarinfo, BytesIO(content))
archive.seek(0)
self.get_wrapped_container().put_archive("/", archive)
56 changes: 56 additions & 0 deletions modules/kafka/tests/test_redpanda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import requests
import json
from kafka import KafkaConsumer, KafkaProducer, TopicPartition, KafkaAdminClient
from kafka.admin import NewTopic
from testcontainers.kafka import RedpandaContainer


def test_redpanda_producer_consumer():
with RedpandaContainer() as container:
produce_and_consume_message(container)


def test_redpanda_confluent_latest():
with RedpandaContainer(
image="docker.redpanda.com/redpandadata/redpanda:latest"
) as container:
produce_and_consume_message(container)


def test_schema_registry():
with RedpandaContainer() as container:
address = container.get_schema_registry_address()
subject_name = "test-subject-value"
url = f"{address}/subjects"

payload = {"schema": json.dumps({"type": "string"})}
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}
create_result = requests.post(
f"{url}/{subject_name}/versions", data=json.dumps(payload), headers=headers
)
assert create_result.status_code == 200

result = requests.get(url)
assert result.status_code == 200
assert subject_name in result.json()


def produce_and_consume_message(container):
topic = "test-topic"
bootstrap_server = container.get_bootstrap_server()

admin = KafkaAdminClient(bootstrap_servers=[bootstrap_server])
admin.create_topics([NewTopic(topic, 1, 1)])

producer = KafkaProducer(bootstrap_servers=[bootstrap_server])
future = producer.send(topic, b"verification message")
future.get(timeout=10)
producer.close()

consumer = KafkaConsumer(bootstrap_servers=[bootstrap_server])
tp = TopicPartition(topic, 0)
consumer.assign([tp])
consumer.seek_to_beginning()
assert (
consumer.end_offsets([tp])[tp] == 1
), "Expected exactly one test message to be present on test topic !"