Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ jobs:
- 5432:5432

steps:
# eclipse-mosquitto does not support configuration via environment variables
# so it is not possible to use github's service feature to start the broker
# instead, we start the broker using docker run command and stop it at the end of the job
- name: "Start MQTT broker"
run: "docker run -d -p 1883:1883 --name mqtt eclipse-mosquitto:2.0-openssl mosquitto -c /mosquitto-no-auth.conf"
Comment on lines +56 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have tried to keep to a strict style within steps of:

  name: "Name"
  run: scripts/<something>

I'd prefer not to break that if possible.

- uses: "actions/checkout@v2"
- uses: "actions/setup-python@v2"
with:
Expand All @@ -63,7 +68,13 @@ jobs:
run: "scripts/check"
- name: "Build package & docs"
run: "scripts/build"


- name: "Run tests"
run: "scripts/test"
- name: "Enforce coverage"
run: "scripts/coverage"

- name: "Stop MQTT broker"
if: always()
run: "docker stop mqtt"
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Broadcaster helps you develop realtime streaming functionality by providing
a simple broadcast API onto a number of different backend services.

It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing.
It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), [MQTT](https://mqtt.org/) plus a simple in-memory backend, that you can use for local development or during testing.

<img src="https://raw.githubusercontent.com/encode/broadcaster/master/docs/demo.gif" alt='WebSockets Demo'>

Expand Down Expand Up @@ -78,6 +78,7 @@ Python 3.8+
* `pip install broadcaster[redis]`
* `pip install broadcaster[postgres]`
* `pip install broadcaster[kafka]`
* `pip install broadcaster[mqtt]`

## Available backends

Expand All @@ -86,6 +87,7 @@ Python 3.8+
* `Broadcast("redis-stream://localhost:6379")`
* `Broadcast("postgres://localhost:5432/broadcaster")`
* `Broadcast("kafka://localhost:9092")`
* `Broadcast("mqtt://localhost:1883")`


### Using custom backends
Expand Down
54 changes: 54 additions & 0 deletions broadcaster/_backends/mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncio
import typing
from urllib.parse import urlparse

import aiomqtt

from .._base import Event
from .base import BroadcastBackend


class MqttBackend(BroadcastBackend):
def __init__(self, url: str):
parsed_url = urlparse(url)
self._host = parsed_url.hostname or "localhost"
self._port = 8883 if parsed_url.scheme == "mqtts" else 1883
self._port = parsed_url.port or self._port
self._client = aiomqtt.Client(self._host, port=self._port)
self._queue: asyncio.Queue[aiomqtt.Message] = asyncio.Queue()
self._listener_task = asyncio.create_task(self._listener())

async def connect(self) -> None:
await self._client.__aenter__()

async def disconnect(self) -> None:
self._listener_task.cancel()
try:
await self._listener_task
except asyncio.CancelledError:
pass

await self._client.__aexit__(None, None, None)

async def subscribe(self, channel: str) -> None:
await self._client.subscribe(channel)

async def unsubscribe(self, channel: str) -> None:
await self._client.unsubscribe(channel)

async def publish(self, channel: str, message: typing.Any) -> None:
await self._client.publish(channel, message, retain=False)

async def next_published(self) -> Event:
message = await self._queue.get()

# Event.message is string, not bytes
# this is a limiting factor and we need to make sure
# that the payload is bytes in order to properly decode it
assert isinstance(message.payload, bytes), "Payload must be bytes."

return Event(channel=message.topic.value, message=message.payload.decode())

async def _listener(self) -> None:
async for message in self._client.messages:
await self._queue.put(message)
6 changes: 6 additions & 0 deletions broadcaster/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ def _create_backend(self, url: str) -> BroadcastBackend:
from broadcaster._backends.memory import MemoryBackend

return MemoryBackend(url)

elif parsed_url.scheme in ("mqtt", "mqtts"):
from ._backends.mqtt import MqttBackend

return MqttBackend(url)

raise ValueError(f"Unsupported backend: {parsed_url.scheme}")

async def __aenter__(self) -> Broadcast:
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ services:
- POSTGRES_USER=postgres
ports:
- 5432:5432
mqtt:
image: "eclipse-mosquitto"
hostname: mqtt
command: mosquitto -c /mosquitto-no-auth.conf
ports:
- 1883:1883

1 change: 1 addition & 0 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ In order to run the app with different backends, you have to set the env
| kafka | `export BROADCAST_URL=kafka://localhost:9092` | `docker-compose up kafka` |
| redis | `export BROADCAST_URL=redis://localhost:6379` | `docker-compose up redis` |
| postgres | `export BROADCAST_URL=postgres://localhost:5432/broadcaster` | `docker-compose up postgres` |
| mqtt | `export BROADCAST_URL=mqtt://localhost:1883` | `docker-compose up mqtt` |
2 changes: 1 addition & 1 deletion example/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ uvicorn
websockets
starlette
jinja2
broadcaster[redis,postgres,kafka]
broadcaster[redis,postgres,kafka,mqtt]
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
redis = ["redis"]
postgres = ["asyncpg"]
kafka = ["aiokafka"]
mqtt = ["aiomqtt"]
test = ["pytest", "pytest-asyncio"]

[project.urls]
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-e .[redis,postgres,kafka]
-e .[redis,postgres,kafka,mqtt]

# Documentation
mkdocs==1.5.3
Expand Down
7 changes: 4 additions & 3 deletions scripts/start
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/bin/sh -e
# Accepted values: postgres, kafka, redis
# Accepted values: postgres, kafka, redis, mqtt
# If no variable provided all services will start
if [ -n "$1" ]; then
if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ]; then
if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ] && [ "$1" != "mqtt" ]; then
echo "Not a valid value. Choose one or none:
kafka
redis
postgres ";
postgres
mqtt ";
exit 1;
fi
fi
Expand Down
10 changes: 10 additions & 0 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ async def test_kafka():
assert event.message == "hello"


@pytest.mark.asyncio
async def test_mqtt():
async with Broadcast("mqtt://localhost:1883") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"


@pytest.mark.asyncio
async def test_custom():
backend = CustomBackend("")
Expand Down