diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 81db761..0e17d07 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -53,6 +53,11 @@ jobs: - 5432:5432 steps: + # eclipse-mosquitto does not support configuration via environment variables + # so it is not possible to use github's service feature to start the broker + # instead, we start the broker using docker run command and stop it at the end of the job + - name: "Start MQTT broker" + run: "docker run -d -p 1883:1883 --name mqtt eclipse-mosquitto:2.0-openssl mosquitto -c /mosquitto-no-auth.conf" - uses: "actions/checkout@v2" - uses: "actions/setup-python@v2" with: @@ -63,7 +68,13 @@ jobs: run: "scripts/check" - name: "Build package & docs" run: "scripts/build" + + - name: "Run tests" run: "scripts/test" - name: "Enforce coverage" run: "scripts/coverage" + + - name: "Stop MQTT broker" + if: always() + run: "docker stop mqtt" \ No newline at end of file diff --git a/README.md b/README.md index eb547ab..3fdb3cc 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Broadcaster helps you develop realtime streaming functionality by providing a simple broadcast API onto a number of different backend services. -It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing. +It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), [MQTT](https://mqtt.org/) plus a simple in-memory backend, that you can use for local development or during testing. WebSockets Demo @@ -78,6 +78,7 @@ Python 3.8+ * `pip install broadcaster[redis]` * `pip install broadcaster[postgres]` * `pip install broadcaster[kafka]` +* `pip install broadcaster[mqtt]` ## Available backends @@ -86,6 +87,7 @@ Python 3.8+ * `Broadcast("redis-stream://localhost:6379")` * `Broadcast("postgres://localhost:5432/broadcaster")` * `Broadcast("kafka://localhost:9092")` +* `Broadcast("mqtt://localhost:1883")` ### Using custom backends diff --git a/broadcaster/_backends/mqtt.py b/broadcaster/_backends/mqtt.py new file mode 100644 index 0000000..aa08f74 --- /dev/null +++ b/broadcaster/_backends/mqtt.py @@ -0,0 +1,54 @@ +import asyncio +import typing +from urllib.parse import urlparse + +import aiomqtt + +from .._base import Event +from .base import BroadcastBackend + + +class MqttBackend(BroadcastBackend): + def __init__(self, url: str): + parsed_url = urlparse(url) + self._host = parsed_url.hostname or "localhost" + self._port = 8883 if parsed_url.scheme == "mqtts" else 1883 + self._port = parsed_url.port or self._port + self._client = aiomqtt.Client(self._host, port=self._port) + self._queue: asyncio.Queue[aiomqtt.Message] = asyncio.Queue() + self._listener_task = asyncio.create_task(self._listener()) + + async def connect(self) -> None: + await self._client.__aenter__() + + async def disconnect(self) -> None: + self._listener_task.cancel() + try: + await self._listener_task + except asyncio.CancelledError: + pass + + await self._client.__aexit__(None, None, None) + + async def subscribe(self, channel: str) -> None: + await self._client.subscribe(channel) + + async def unsubscribe(self, channel: str) -> None: + await self._client.unsubscribe(channel) + + async def publish(self, channel: str, message: typing.Any) -> None: + await self._client.publish(channel, message, retain=False) + + async def next_published(self) -> Event: + message = await self._queue.get() + + # Event.message is string, not bytes + # this is a limiting factor and we need to make sure + # that the payload is bytes in order to properly decode it + assert isinstance(message.payload, bytes), "Payload must be bytes." + + return Event(channel=message.topic.value, message=message.payload.decode()) + + async def _listener(self) -> None: + async for message in self._client.messages: + await self._queue.put(message) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 4650e0a..a182088 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -57,6 +57,12 @@ def _create_backend(self, url: str) -> BroadcastBackend: from broadcaster._backends.memory import MemoryBackend return MemoryBackend(url) + + elif parsed_url.scheme in ("mqtt", "mqtts"): + from ._backends.mqtt import MqttBackend + + return MqttBackend(url) + raise ValueError(f"Unsupported backend: {parsed_url.scheme}") async def __aenter__(self) -> Broadcast: diff --git a/docker-compose.yaml b/docker-compose.yaml index 60073b4..03fcbaa 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -36,3 +36,10 @@ services: - POSTGRES_USER=postgres ports: - 5432:5432 + mqtt: + image: "eclipse-mosquitto" + hostname: mqtt + command: mosquitto -c /mosquitto-no-auth.conf + ports: + - 1883:1883 + diff --git a/example/README.md b/example/README.md index 069d978..26b8ef0 100644 --- a/example/README.md +++ b/example/README.md @@ -22,3 +22,4 @@ In order to run the app with different backends, you have to set the env | kafka | `export BROADCAST_URL=kafka://localhost:9092` | `docker-compose up kafka` | | redis | `export BROADCAST_URL=redis://localhost:6379` | `docker-compose up redis` | | postgres | `export BROADCAST_URL=postgres://localhost:5432/broadcaster` | `docker-compose up postgres` | +| mqtt | `export BROADCAST_URL=mqtt://localhost:1883` | `docker-compose up mqtt` | diff --git a/example/requirements.txt b/example/requirements.txt index 44835e5..5fccb92 100644 --- a/example/requirements.txt +++ b/example/requirements.txt @@ -2,4 +2,4 @@ uvicorn websockets starlette jinja2 -broadcaster[redis,postgres,kafka] +broadcaster[redis,postgres,kafka,mqtt] diff --git a/pyproject.toml b/pyproject.toml index c4e8036..54800d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ redis = ["redis"] postgres = ["asyncpg"] kafka = ["aiokafka"] +mqtt = ["aiomqtt"] test = ["pytest", "pytest-asyncio"] [project.urls] diff --git a/requirements.txt b/requirements.txt index ed2926b..aca606f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ --e .[redis,postgres,kafka] +-e .[redis,postgres,kafka,mqtt] # Documentation mkdocs==1.5.3 diff --git a/scripts/start b/scripts/start index 188ffae..9c08216 100755 --- a/scripts/start +++ b/scripts/start @@ -1,12 +1,13 @@ #!/bin/sh -e -# Accepted values: postgres, kafka, redis +# Accepted values: postgres, kafka, redis, mqtt # If no variable provided all services will start if [ -n "$1" ]; then - if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ]; then + if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ] && [ "$1" != "mqtt" ]; then echo "Not a valid value. Choose one or none: kafka redis - postgres "; + postgres + mqtt "; exit 1; fi fi diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index d418508..4f04f8d 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -91,6 +91,16 @@ async def test_kafka(): assert event.message == "hello" +@pytest.mark.asyncio +async def test_mqtt(): + async with Broadcast("mqtt://localhost:1883") as broadcast: + async with broadcast.subscribe("chatroom") as subscriber: + await broadcast.publish("chatroom", "hello") + event = await subscriber.get() + assert event.channel == "chatroom" + assert event.message == "hello" + + @pytest.mark.asyncio async def test_custom(): backend = CustomBackend("")