From f7ff3880100fea38eeb7696e38fdcaa3683c9d2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Santiago=20Fraire=20Willemo=C3=ABs?= Date: Fri, 21 Feb 2020 18:42:57 +0100 Subject: [PATCH 1/2] feat: add kafka broadcast support --- README.md | 2 ++ broadcaster/_backends/kafka.py | 39 +++++++++++++++++++++ broadcaster/_backends/memory.py | 4 +-- broadcaster/_backends/postgres.py | 4 +-- broadcaster/_base.py | 6 +++- docker-compose.yaml | 38 ++++++++++++++++++++ example/README.md | 24 +++++++++++++ example.py => example/app.py | 13 ++++--- example/requirements.txt | 7 ++++ {templates => example/templates}/index.html | 0 scripts/start | 14 ++++++++ setup.py | 1 + tests/__init__.py | 0 tests/test_broadcast.py | 12 ++++++- 14 files changed, 153 insertions(+), 11 deletions(-) create mode 100644 broadcaster/_backends/kafka.py create mode 100644 docker-compose.yaml create mode 100644 example/README.md rename example.py => example/app.py (82%) create mode 100644 example/requirements.txt rename {templates => example/templates}/index.html (100%) create mode 100755 scripts/start create mode 100644 tests/__init__.py diff --git a/README.md b/README.md index 2283368..e9cbd21 100644 --- a/README.md +++ b/README.md @@ -70,12 +70,14 @@ The HTML template for the front end [is available here](https://github.com/encod * `pip install broadcaster` * `pip install broadcaster[redis]` * `pip install broadcaster[postgres]` +* `pip install broadcaster[kafka]` ## Available backends * `Broadcast('memory://')` * `Broadcast("redis://localhost:6379")` * `Broadcast("postgres://localhost:5432/hostedapi")` +* `Broadcast("kafka://localhost:9092")` ## Where next? diff --git a/broadcaster/_backends/kafka.py b/broadcaster/_backends/kafka.py new file mode 100644 index 0000000..a3df086 --- /dev/null +++ b/broadcaster/_backends/kafka.py @@ -0,0 +1,39 @@ +import asyncio +import typing +from urllib.parse import urlparse + +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer + +from .._base import Event +from .base import BroadcastBackend + + +class KafkaBackend(BroadcastBackend): + def __init__(self, url: str): + self._servers = [urlparse(url).netloc] + self._consumer_channels: typing.Set = set() + + async def connect(self) -> None: + loop = asyncio.get_event_loop() + self._producer = AIOKafkaProducer(loop=loop, bootstrap_servers=self._servers) + self._consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self._servers) + await self._producer.start() + await self._consumer.start() + + async def disconnect(self) -> None: + await self._producer.stop() + await self._consumer.stop() + + async def subscribe(self, channel: str) -> None: + self._consumer_channels.add(channel) + self._consumer.subscribe(topics=self._consumer_channels) + + async def unsubscribe(self, channel: str) -> None: + await self._consumer.unsubscribe() + + async def publish(self, channel: str, message: typing.Any) -> None: + await self._producer.send_and_wait(channel, message.encode("utf8")) + + async def next_published(self) -> Event: + message = await self._consumer.getone() + return Event(channel=message.topic, message=message.value.decode("utf8")) diff --git a/broadcaster/_backends/memory.py b/broadcaster/_backends/memory.py index 5f56c26..013c028 100644 --- a/broadcaster/_backends/memory.py +++ b/broadcaster/_backends/memory.py @@ -6,8 +6,8 @@ class MemoryBackend(BroadcastBackend): def __init__(self, url: str): - self._subscribed = set() - self._published = asyncio.Queue() + self._subscribed: typing.Set = set() + self._published: asyncio.Queue = asyncio.Queue() async def connect(self) -> None: pass diff --git a/broadcaster/_backends/postgres.py b/broadcaster/_backends/postgres.py index 81ab5ab..19719a9 100644 --- a/broadcaster/_backends/postgres.py +++ b/broadcaster/_backends/postgres.py @@ -1,7 +1,5 @@ import asyncio import asyncpg -import typing -from urllib.parse import ParseResult from .base import BroadcastBackend from .._base import Event @@ -9,7 +7,7 @@ class PostgresBackend(BroadcastBackend): def __init__(self, url: str): self._url = url - self._listen_queue = asyncio.Queue() + self._listen_queue: asyncio.Queue = asyncio.Queue() async def connect(self) -> None: self._conn = await asyncpg.connect(self._url) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 3e73df4..44ec030 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -36,6 +36,10 @@ def __init__(self, url: str): from ._backends.postgres import PostgresBackend self._backend = PostgresBackend(url) + if parsed_url.scheme == 'kafka': + from ._backends.kafka import KafkaBackend + self._backend = KafkaBackend(url) + elif parsed_url.scheme == 'memory': from ._backends.memory import MemoryBackend self._backend = MemoryBackend(url) @@ -69,7 +73,7 @@ async def publish(self, channel: str, message: typing.Any) -> None: @asynccontextmanager async def subscribe(self, channel: str) -> 'Subscriber': - queue = asyncio.Queue() + queue: asyncio.Queue = asyncio.Queue() try: if not self._subscribers.get(channel): diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..3a5fde0 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,38 @@ +version: '3' +services: + zookeeper: + image: "confluentinc/cp-zookeeper" + hostname: zookeeper + ports: + - 32181:32181 + environment: + - ZOOKEEPER_CLIENT_PORT=32181 + - ALLOW_ANONYMOUS_LOGIN=yes + kafka: + image: confluentinc/cp-kafka + hostname: kafka + ports: + - 9092:9092 + - 29092:29092 + depends_on: + - zookeeper + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT_HOST://localhost:29092,PLAINTEXT://kafka:9092 + - KAFKA_BROKER_ID=1 + - ALLOW_PLAINTEXT_LISTENER=yes + redis: + image: "redis:alpine" + ports: + - 6379:6379 + postgres: + image: "postgres:12" + environment: + - POSTGRES_DB=hostedapi + - POSTGRES_PASSWORD=postgres + - POSTGRES_HOST_AUTH_METHOD=trust + - POSTGRES_USER=postgres + ports: + - 5432:5432 diff --git a/example/README.md b/example/README.md new file mode 100644 index 0000000..611004d --- /dev/null +++ b/example/README.md @@ -0,0 +1,24 @@ +# Setup + +Install python dependencies in your virtualenv + +```bash +pip install -r requirements.txt +``` + +Run example with memory as backend. + +```bash +uvicorn example.app:app +``` + +You can also install broadcaster locally using `pip install -e .`. + +In order to run the app with different backends, you have to set the env +`BROADCAST_URL` and start the docker services. + +| Backend | Env | Service command | +| -------- | ---------------------------------------------------------- | ---------------------------- | +| kafka | `export BROADCAST_URL=kafka://localhost:9092` | `docker-compose up kafka` | +| redis | `export BROADCAST_URL=redis://localhost:6379` | `docker-compose up redis` | +| postgres | `export BROADCAST_URL=postgres://localhost:5432/hostedapi` | `docker-compose up postgres` | diff --git a/example.py b/example/app.py similarity index 82% rename from example.py rename to example/app.py index cfce6cc..9b57e44 100644 --- a/example.py +++ b/example/app.py @@ -1,12 +1,17 @@ -from broadcaster import Broadcast +import os + +import uvicorn from starlette.applications import Starlette from starlette.concurrency import run_until_first_complete from starlette.routing import Route, WebSocketRoute from starlette.templating import Jinja2Templates +from broadcaster import Broadcast + +BROADCAST_URL = os.environ.get("BROADCAST_URL", "memory://") -broadcast = Broadcast("redis://localhost:6379") -templates = Jinja2Templates("templates") +broadcast = Broadcast(BROADCAST_URL) +templates = Jinja2Templates("example/templates") async def homepage(request): @@ -36,7 +41,7 @@ async def chatroom_ws_sender(websocket): routes = [ Route("/", homepage), - WebSocketRoute("/", chatroom_ws, name='chatroom_ws'), + WebSocketRoute("/", chatroom_ws, name="chatroom_ws"), ] diff --git a/example/requirements.txt b/example/requirements.txt new file mode 100644 index 0000000..730d740 --- /dev/null +++ b/example/requirements.txt @@ -0,0 +1,7 @@ +uvicorn +starlette +jinja2 +aiokafka +asyncio-redis +asyncpg +broadcaster[redis,postgres,kafka] diff --git a/templates/index.html b/example/templates/index.html similarity index 100% rename from templates/index.html rename to example/templates/index.html diff --git a/scripts/start b/scripts/start new file mode 100755 index 0000000..188ffae --- /dev/null +++ b/scripts/start @@ -0,0 +1,14 @@ +#!/bin/sh -e +# Accepted values: postgres, kafka, redis +# If no variable provided all services will start +if [ -n "$1" ]; then + if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ]; then + echo "Not a valid value. Choose one or none: + kafka + redis + postgres "; + exit 1; + fi +fi + +docker-compose up $1 diff --git a/setup.py b/setup.py index 4db885b..61793bb 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ def get_packages(package): extras_require={ "redis": ["asyncio-redis"], "postgres": ["asyncpg"], + "kafka": ["aiokafka"] }, classifiers=[ "Development Status :: 3 - Alpha", diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 6bac3db..bcde18a 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -25,7 +25,17 @@ async def test_redis(): @pytest.mark.asyncio async def test_postgres(): - async with Broadcast('postgres://localhost:5432/hostedapi') as broadcast: + async with Broadcast('postgres://postgres:postgres@localhost:5432/hostedapi') as broadcast: + async with broadcast.subscribe('chatroom') as subscriber: + await broadcast.publish('chatroom', 'hello') + event = await subscriber.get() + assert event.channel == 'chatroom' + assert event.message == 'hello' + + +@pytest.mark.asyncio +async def test_kafka(): + async with Broadcast('kafka://localhost:9092') as broadcast: async with broadcast.subscribe('chatroom') as subscriber: await broadcast.publish('chatroom', 'hello') event = await subscriber.get() From baa1656a9485a3ea5a25f287ef17949de66ea3f6 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 26 Feb 2020 12:11:11 +0000 Subject: [PATCH 2/2] Update requirements.txt --- example/requirements.txt | 3 --- 1 file changed, 3 deletions(-) diff --git a/example/requirements.txt b/example/requirements.txt index 730d740..2b7b4ad 100644 --- a/example/requirements.txt +++ b/example/requirements.txt @@ -1,7 +1,4 @@ uvicorn starlette jinja2 -aiokafka -asyncio-redis -asyncpg broadcaster[redis,postgres,kafka]