From 33bdfc69147037b3943d7315c007fd6a8d07a14f Mon Sep 17 00:00:00 2001 From: Fernando Governatore Date: Fri, 15 Jan 2021 20:50:24 -0300 Subject: [PATCH 01/12] Adds MQTT backend --- README.md | 4 +++- broadcaster/_backends/mqtt.py | 39 +++++++++++++++++++++++++++++++++++ broadcaster/_base.py | 4 ++++ docker-compose.yaml | 6 ++++++ example/README.md | 1 + example/requirements.txt | 2 +- scripts/start | 7 ++++--- setup.py | 3 ++- tests/test_broadcast.py | 10 +++++++++ 9 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 broadcaster/_backends/mqtt.py diff --git a/README.md b/README.md index 61709ac..6f3e056 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Broadcaster helps you develop realtime streaming functionality by providing a simple broadcast API onto a number of different backend services. -It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing. +It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), [MQTT](https://mqtt.org/) plus a simple in-memory backend, that you can use for local development or during testing. WebSockets Demo @@ -73,6 +73,7 @@ Python 3.7+ * `pip install broadcaster[redis]` * `pip install broadcaster[postgres]` * `pip install broadcaster[kafka]` +* `pip install broadcaster[mqtt]` ## Available backends @@ -80,6 +81,7 @@ Python 3.7+ * `Broadcast("redis://localhost:6379")` * `Broadcast("postgres://localhost:5432/broadcaster")` * `Broadcast("kafka://localhost:9092")` +* `Broadcast("mqtt://localhost:1883")` ## Where next? diff --git a/broadcaster/_backends/mqtt.py b/broadcaster/_backends/mqtt.py new file mode 100644 index 0000000..614a466 --- /dev/null +++ b/broadcaster/_backends/mqtt.py @@ -0,0 +1,39 @@ +import asyncio_mqtt +import typing +from urllib.parse import urlparse +from .base import BroadcastBackend +from .._base import Event +from contextlib import AsyncExitStack, asynccontextmanager + +class MqttBackend(BroadcastBackend): + def __init__(self, url: str): + parsed_url = urlparse(url) + self._host = parsed_url.hostname or "localhost" + self._port = 8883 if parsed_url.scheme == 'mqtts' else 1883 + self._port = parsed_url.port or self._port + + async def connect(self) -> None: + self.stack = AsyncExitStack() + + self.client = asyncio_mqtt.Client(self._host, port=self._port) + self.messages = self.client.filtered_messages('#') + + self.client = await self.stack.enter_async_context(self.client) + self.messages = await self.stack.enter_async_context(self.messages) + + async def disconnect(self) -> None: + await self.stack.aclose() + + async def subscribe(self, channel: str) -> None: + await self.client.subscribe(channel) + + async def unsubscribe(self, channel: str) -> None: + await self.client.unsubscribe(channel) + + async def publish(self, channel: str, message: typing.Any) -> None: + await self.client.publish(channel, message, retain=False) + + async def next_published(self) -> Event: + async for message in self.messages: + return Event(channel=message.topic, message=message.payload) + diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 44ec030..aba0700 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -44,6 +44,10 @@ def __init__(self, url: str): from ._backends.memory import MemoryBackend self._backend = MemoryBackend(url) + elif parsed_url.scheme in ('mqtt', 'mqtts'): + from ._backends.mqtt import MqttBackend + self._backend = MqttBackend(url) + async def __aenter__(self) -> 'Broadcast': await self.connect() return self diff --git a/docker-compose.yaml b/docker-compose.yaml index 60073b4..b38a1ba 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -36,3 +36,9 @@ services: - POSTGRES_USER=postgres ports: - 5432:5432 + mqtt: + image: "eclipse-mosquitto" + hostname: mqtt + ports: + - 1883:1883 + - 8883:8883 diff --git a/example/README.md b/example/README.md index 069d978..26b8ef0 100644 --- a/example/README.md +++ b/example/README.md @@ -22,3 +22,4 @@ In order to run the app with different backends, you have to set the env | kafka | `export BROADCAST_URL=kafka://localhost:9092` | `docker-compose up kafka` | | redis | `export BROADCAST_URL=redis://localhost:6379` | `docker-compose up redis` | | postgres | `export BROADCAST_URL=postgres://localhost:5432/broadcaster` | `docker-compose up postgres` | +| mqtt | `export BROADCAST_URL=mqtt://localhost:1883` | `docker-compose up mqtt` | diff --git a/example/requirements.txt b/example/requirements.txt index 2b7b4ad..a7f8a09 100644 --- a/example/requirements.txt +++ b/example/requirements.txt @@ -1,4 +1,4 @@ uvicorn starlette jinja2 -broadcaster[redis,postgres,kafka] +broadcaster[redis,postgres,kafka,mqtt] diff --git a/scripts/start b/scripts/start index 188ffae..9c08216 100755 --- a/scripts/start +++ b/scripts/start @@ -1,12 +1,13 @@ #!/bin/sh -e -# Accepted values: postgres, kafka, redis +# Accepted values: postgres, kafka, redis, mqtt # If no variable provided all services will start if [ -n "$1" ]; then - if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ]; then + if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ] && [ "$1" != "mqtt" ]; then echo "Not a valid value. Choose one or none: kafka redis - postgres "; + postgres + mqtt "; exit 1; fi fi diff --git a/setup.py b/setup.py index 75229e5..8212b0e 100644 --- a/setup.py +++ b/setup.py @@ -49,7 +49,8 @@ def get_packages(package): extras_require={ "redis": ["asyncio-redis"], "postgres": ["asyncpg"], - "kafka": ["aiokafka"] + "kafka": ["aiokafka"], + "mqtt": ["asyncio-mqtt"] }, classifiers=[ "Development Status :: 3 - Alpha", diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 7e0c8c0..d4ab4bd 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -41,3 +41,13 @@ async def test_kafka(): event = await subscriber.get() assert event.channel == 'chatroom' assert event.message == 'hello' + +@pytest.mark.asyncio +async def test_mqtt(): + async with Broadcast('mqtt://localhost:1883') as broadcast: + async with broadcast.subscribe('chatroom') as subscriber: + await broadcast.publish('chatroom', 'hello') + event = await subscriber.get() + assert event.channel == 'chatroom' + assert event.message.decode('utf-8') == 'hello' + From a200cd373af858db351a90c15d9a56080fa88b15 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sat, 20 Aug 2022 13:13:31 +0200 Subject: [PATCH 02/12] Update setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7348767..0277526 100644 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ def get_packages(package): "redis": ["asyncio-redis"], "postgres": ["asyncpg"], "kafka": ["aiokafka"], - "mqtt": ["asyncio-mqtt"] + "mqtt": ["asyncio-mqtt"], "test": ["pytest", "pytest-asyncio"], }, classifiers=[ From c5562a1977950e1ac9b76a9b4a58c26ccfc3b2b0 Mon Sep 17 00:00:00 2001 From: Fernando Date: Sat, 20 Aug 2022 16:58:40 -0300 Subject: [PATCH 03/12] Uses double quotes for strings on new MQTT code --- broadcaster/_backends/mqtt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broadcaster/_backends/mqtt.py b/broadcaster/_backends/mqtt.py index 614a466..c9220ba 100644 --- a/broadcaster/_backends/mqtt.py +++ b/broadcaster/_backends/mqtt.py @@ -9,14 +9,14 @@ class MqttBackend(BroadcastBackend): def __init__(self, url: str): parsed_url = urlparse(url) self._host = parsed_url.hostname or "localhost" - self._port = 8883 if parsed_url.scheme == 'mqtts' else 1883 + self._port = 8883 if parsed_url.scheme == "mqtts" else 1883 self._port = parsed_url.port or self._port async def connect(self) -> None: self.stack = AsyncExitStack() self.client = asyncio_mqtt.Client(self._host, port=self._port) - self.messages = self.client.filtered_messages('#') + self.messages = self.client.filtered_messages("#") self.client = await self.stack.enter_async_context(self.client) self.messages = await self.stack.enter_async_context(self.messages) From 50307681ee798a0bf9c8db206b50446deb7052c9 Mon Sep 17 00:00:00 2001 From: Fernando Date: Sat, 20 Aug 2022 16:59:31 -0300 Subject: [PATCH 04/12] Fixes linting on new MQTT code --- broadcaster/_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 2464b22..776fbe2 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -51,8 +51,9 @@ def __init__(self, url: str): self._backend = MemoryBackend(url) - elif parsed_url.scheme in ('mqtt', 'mqtts'): + elif parsed_url.scheme in ("mqtt", "mqtts"): from ._backends.mqtt import MqttBackend + self._backend = MqttBackend(url) async def __aenter__(self) -> "Broadcast": From 2a2bf1b40cb376b8a92b74baf3f5254913eb6b5a Mon Sep 17 00:00:00 2001 From: Fernando Date: Sat, 20 Aug 2022 17:01:48 -0300 Subject: [PATCH 05/12] Fixes blank lines in new MQTT code --- broadcaster/_backends/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadcaster/_backends/mqtt.py b/broadcaster/_backends/mqtt.py index c9220ba..f148aba 100644 --- a/broadcaster/_backends/mqtt.py +++ b/broadcaster/_backends/mqtt.py @@ -5,6 +5,7 @@ from .._base import Event from contextlib import AsyncExitStack, asynccontextmanager + class MqttBackend(BroadcastBackend): def __init__(self, url: str): parsed_url = urlparse(url) @@ -36,4 +37,3 @@ async def publish(self, channel: str, message: typing.Any) -> None: async def next_published(self) -> Event: async for message in self.messages: return Event(channel=message.topic, message=message.payload) - From 1e12bd81201970594f50168615eeb0e1b243a85b Mon Sep 17 00:00:00 2001 From: Fernando Date: Sat, 20 Aug 2022 17:06:30 -0300 Subject: [PATCH 06/12] Removes unused import from new MQTT code --- broadcaster/_backends/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadcaster/_backends/mqtt.py b/broadcaster/_backends/mqtt.py index f148aba..c3d5fd7 100644 --- a/broadcaster/_backends/mqtt.py +++ b/broadcaster/_backends/mqtt.py @@ -3,7 +3,7 @@ from urllib.parse import urlparse from .base import BroadcastBackend from .._base import Event -from contextlib import AsyncExitStack, asynccontextmanager +from contextlib import AsyncExitStack class MqttBackend(BroadcastBackend): From fb9a33ffcfc1bf846c929ecdc7e034773d287514 Mon Sep 17 00:00:00 2001 From: Fernando Governatore Date: Sat, 20 Aug 2022 17:31:56 -0300 Subject: [PATCH 07/12] Works around mypy problem MyPy would complain the method has a "missing return" even though the return is explicit. This seems to happen because of the "async" block. Putting the value that was returned in a variable and returning the variable from outside the "async for" block makes MyPy happy. --- broadcaster/_backends/mqtt.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/broadcaster/_backends/mqtt.py b/broadcaster/_backends/mqtt.py index c3d5fd7..f435e90 100644 --- a/broadcaster/_backends/mqtt.py +++ b/broadcaster/_backends/mqtt.py @@ -36,4 +36,7 @@ async def publish(self, channel: str, message: typing.Any) -> None: async def next_published(self) -> Event: async for message in self.messages: - return Event(channel=message.topic, message=message.payload) + event = Event(channel=message.topic, message=message.payload) + break + + return event From e8623789e2ccc22b6d81ba7bdcb243ba90d5e4b6 Mon Sep 17 00:00:00 2001 From: Fernando Governatore Date: Sat, 20 Aug 2022 17:34:02 -0300 Subject: [PATCH 08/12] Fixes import ordering --- broadcaster/_backends/mqtt.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/broadcaster/_backends/mqtt.py b/broadcaster/_backends/mqtt.py index f435e90..6fa825b 100644 --- a/broadcaster/_backends/mqtt.py +++ b/broadcaster/_backends/mqtt.py @@ -1,9 +1,11 @@ -import asyncio_mqtt import typing +from contextlib import AsyncExitStack from urllib.parse import urlparse -from .base import BroadcastBackend + +import asyncio_mqtt + from .._base import Event -from contextlib import AsyncExitStack +from .base import BroadcastBackend class MqttBackend(BroadcastBackend): From 1a084d771024b9b4023fc2583a5a9550c2e4f4c5 Mon Sep 17 00:00:00 2001 From: Fernando Governatore Date: Sun, 21 Aug 2022 02:26:18 -0300 Subject: [PATCH 09/12] The returned message type is bytes MQTT allows for binary messages, deciding for the user that we should convert it to a string would force us to also decide on an encoding. Lets leave this decision to the user. --- tests/test_broadcast.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index f7279fc..4b6adaf 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -53,4 +53,4 @@ async def test_mqtt(): await broadcast.publish("chatroom", "hello") event = await subscriber.get() assert event.channel == "chatroom" - assert event.message == "hello" + assert event.message == b"hello" From af8f318b458c43727c62c6fc4f31fbac62642a5a Mon Sep 17 00:00:00 2001 From: Fernando Governatore Date: Sun, 21 Aug 2022 02:40:48 -0300 Subject: [PATCH 10/12] Configures MQTT to accept anonymous users for testing --- docker-compose.yaml | 3 ++- scripts/docker/mqtt/mosquitto-no-auth.conf | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 scripts/docker/mqtt/mosquitto-no-auth.conf diff --git a/docker-compose.yaml b/docker-compose.yaml index b38a1ba..03fcbaa 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -39,6 +39,7 @@ services: mqtt: image: "eclipse-mosquitto" hostname: mqtt + command: mosquitto -c /mosquitto-no-auth.conf ports: - 1883:1883 - - 8883:8883 + diff --git a/scripts/docker/mqtt/mosquitto-no-auth.conf b/scripts/docker/mqtt/mosquitto-no-auth.conf new file mode 100644 index 0000000..40dd92b --- /dev/null +++ b/scripts/docker/mqtt/mosquitto-no-auth.conf @@ -0,0 +1,5 @@ +# This is a Mosquitto configuration file that creates a listener on port 1883 +# that allows unauthenticated access. + +listener 1883 +allow_anonymous true From 8ce51e84956e96e58e5f16127a2ae0c932469cae Mon Sep 17 00:00:00 2001 From: Fernando Governatore Date: Sun, 21 Aug 2022 03:02:37 -0300 Subject: [PATCH 11/12] Adds the MQTT service to GitHub workflow --- .github/workflows/test-suite.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index e256667..4d009b2 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -51,6 +51,11 @@ jobs: POSTGRES_USER: postgres ports: - 5432:5432 + mqtt: + image: "eclipse-mosquitto" + ports: + - 1883:1883 + options: --hostname mqtt mosquitto -c /mosquitto-no-auth.conf steps: - uses: "actions/checkout@v2" From 0e48df3a129998036b2e454b5b54e5dddc00d8dc Mon Sep 17 00:00:00 2001 From: Fernando Governatore Date: Sun, 21 Aug 2022 03:07:20 -0300 Subject: [PATCH 12/12] Adds mqtt to the requirements file --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 02846f0..f8edf92 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ --e .[redis,postgres,kafka] +-e .[redis,postgres,kafka,mqtt] # Documentation mkdocs==1.3.1