diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index e256667..4d009b2 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -51,6 +51,11 @@ jobs: POSTGRES_USER: postgres ports: - 5432:5432 + mqtt: + image: "eclipse-mosquitto" + ports: + - 1883:1883 + options: --hostname mqtt mosquitto -c /mosquitto-no-auth.conf steps: - uses: "actions/checkout@v2" diff --git a/README.md b/README.md index 61709ac..6f3e056 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Broadcaster helps you develop realtime streaming functionality by providing a simple broadcast API onto a number of different backend services. -It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing. +It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), [MQTT](https://mqtt.org/) plus a simple in-memory backend, that you can use for local development or during testing. WebSockets Demo @@ -73,6 +73,7 @@ Python 3.7+ * `pip install broadcaster[redis]` * `pip install broadcaster[postgres]` * `pip install broadcaster[kafka]` +* `pip install broadcaster[mqtt]` ## Available backends @@ -80,6 +81,7 @@ Python 3.7+ * `Broadcast("redis://localhost:6379")` * `Broadcast("postgres://localhost:5432/broadcaster")` * `Broadcast("kafka://localhost:9092")` +* `Broadcast("mqtt://localhost:1883")` ## Where next? diff --git a/broadcaster/_backends/mqtt.py b/broadcaster/_backends/mqtt.py new file mode 100644 index 0000000..6fa825b --- /dev/null +++ b/broadcaster/_backends/mqtt.py @@ -0,0 +1,44 @@ +import typing +from contextlib import AsyncExitStack +from urllib.parse import urlparse + +import asyncio_mqtt + +from .._base import Event +from .base import BroadcastBackend + + +class MqttBackend(BroadcastBackend): + def __init__(self, url: str): + parsed_url = urlparse(url) + self._host = parsed_url.hostname or "localhost" + self._port = 8883 if parsed_url.scheme == "mqtts" else 1883 + self._port = parsed_url.port or self._port + + async def connect(self) -> None: + self.stack = AsyncExitStack() + + self.client = asyncio_mqtt.Client(self._host, port=self._port) + self.messages = self.client.filtered_messages("#") + + self.client = await self.stack.enter_async_context(self.client) + self.messages = await self.stack.enter_async_context(self.messages) + + async def disconnect(self) -> None: + await self.stack.aclose() + + async def subscribe(self, channel: str) -> None: + await self.client.subscribe(channel) + + async def unsubscribe(self, channel: str) -> None: + await self.client.unsubscribe(channel) + + async def publish(self, channel: str, message: typing.Any) -> None: + await self.client.publish(channel, message, retain=False) + + async def next_published(self) -> Event: + async for message in self.messages: + event = Event(channel=message.topic, message=message.payload) + break + + return event diff --git a/broadcaster/_base.py b/broadcaster/_base.py index c58cb1d..776fbe2 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -51,6 +51,11 @@ def __init__(self, url: str): self._backend = MemoryBackend(url) + elif parsed_url.scheme in ("mqtt", "mqtts"): + from ._backends.mqtt import MqttBackend + + self._backend = MqttBackend(url) + async def __aenter__(self) -> "Broadcast": await self.connect() return self diff --git a/docker-compose.yaml b/docker-compose.yaml index 60073b4..03fcbaa 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -36,3 +36,10 @@ services: - POSTGRES_USER=postgres ports: - 5432:5432 + mqtt: + image: "eclipse-mosquitto" + hostname: mqtt + command: mosquitto -c /mosquitto-no-auth.conf + ports: + - 1883:1883 + diff --git a/example/README.md b/example/README.md index 069d978..26b8ef0 100644 --- a/example/README.md +++ b/example/README.md @@ -22,3 +22,4 @@ In order to run the app with different backends, you have to set the env | kafka | `export BROADCAST_URL=kafka://localhost:9092` | `docker-compose up kafka` | | redis | `export BROADCAST_URL=redis://localhost:6379` | `docker-compose up redis` | | postgres | `export BROADCAST_URL=postgres://localhost:5432/broadcaster` | `docker-compose up postgres` | +| mqtt | `export BROADCAST_URL=mqtt://localhost:1883` | `docker-compose up mqtt` | diff --git a/example/requirements.txt b/example/requirements.txt index 2b7b4ad..a7f8a09 100644 --- a/example/requirements.txt +++ b/example/requirements.txt @@ -1,4 +1,4 @@ uvicorn starlette jinja2 -broadcaster[redis,postgres,kafka] +broadcaster[redis,postgres,kafka,mqtt] diff --git a/requirements.txt b/requirements.txt index 02846f0..f8edf92 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ --e .[redis,postgres,kafka] +-e .[redis,postgres,kafka,mqtt] # Documentation mkdocs==1.3.1 diff --git a/scripts/docker/mqtt/mosquitto-no-auth.conf b/scripts/docker/mqtt/mosquitto-no-auth.conf new file mode 100644 index 0000000..40dd92b --- /dev/null +++ b/scripts/docker/mqtt/mosquitto-no-auth.conf @@ -0,0 +1,5 @@ +# This is a Mosquitto configuration file that creates a listener on port 1883 +# that allows unauthenticated access. + +listener 1883 +allow_anonymous true diff --git a/scripts/start b/scripts/start index 188ffae..9c08216 100755 --- a/scripts/start +++ b/scripts/start @@ -1,12 +1,13 @@ #!/bin/sh -e -# Accepted values: postgres, kafka, redis +# Accepted values: postgres, kafka, redis, mqtt # If no variable provided all services will start if [ -n "$1" ]; then - if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ]; then + if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ] && [ "$1" != "mqtt" ]; then echo "Not a valid value. Choose one or none: kafka redis - postgres "; + postgres + mqtt "; exit 1; fi fi diff --git a/setup.py b/setup.py index 9cdf906..0277526 100644 --- a/setup.py +++ b/setup.py @@ -49,6 +49,7 @@ def get_packages(package): "redis": ["asyncio-redis"], "postgres": ["asyncpg"], "kafka": ["aiokafka"], + "mqtt": ["asyncio-mqtt"], "test": ["pytest", "pytest-asyncio"], }, classifiers=[ diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index e3313bc..4b6adaf 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -44,3 +44,13 @@ async def test_kafka(): event = await subscriber.get() assert event.channel == "chatroom" assert event.message == "hello" + + +@pytest.mark.asyncio +async def test_mqtt(): + async with Broadcast("mqtt://localhost:1883") as broadcast: + async with broadcast.subscribe("chatroom") as subscriber: + await broadcast.publish("chatroom", "hello") + event = await subscriber.get() + assert event.channel == "chatroom" + assert event.message == b"hello"