Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Broadcaster helps you develop realtime streaming functionality by providing
a simple broadcast API onto a number of different backend services.

It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing.
It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing.

<img src="https://raw.githubusercontent.com/encode/broadcaster/master/docs/demo.gif" alt='WebSockets Demo'>

Expand Down Expand Up @@ -83,6 +83,7 @@ Python 3.8+

* `Broadcast('memory://')`
* `Broadcast("redis://localhost:6379")`
* `Broadcast("redis-stream://localhost:6379")`
* `Broadcast("postgres://localhost:5432/broadcaster")`
* `Broadcast("kafka://localhost:9092")`

Expand All @@ -97,7 +98,6 @@ and pass it to the `broadcaster` via `backend` argument.
from broadcaster import Broadcaster, BroadcastBackend

class MyBackend(BroadcastBackend):
...

broadcaster = Broadcaster(backend=MyBackend())
```
Expand All @@ -112,6 +112,6 @@ state, make sure to strictly pin your requirements to `broadcaster==0.2.0`.
To be more capable we'd really want to add some additional backends, provide API support for reading recent event history from persistent stores, and provide a serialization/deserialization API...

* Serialization / deserialization to support broadcasting structured data.
* Backends for Redis Streams, Apache Kafka, and RabbitMQ.
* A backend for RabbitMQ.
* Add support for `subscribe('chatroom', history=100)` for backends which provide persistence. (Redis Streams, Apache Kafka) This will allow applications to subscribe to channel updates, while also being given an initial window onto the most recent events. We *might* also want to support some basic paging operations, to allow applications to scan back in the event history.
* Support for pattern subscribes in backends that support it.
53 changes: 53 additions & 0 deletions broadcaster/_backends/redis.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import asyncio
import typing

Expand Down Expand Up @@ -47,3 +49,54 @@ async def _pubsub_listener(self) -> None:
message=message["data"].decode(),
)
await self._queue.put(event)


StreamMessageType = typing.Tuple[bytes, typing.Tuple[typing.Tuple[bytes, typing.Dict[bytes, bytes]]]]


class RedisStreamBackend(BroadcastBackend):
def __init__(self, url: str):
url = url.replace("redis-stream", "redis", 1)
self.streams: dict[str, str] = {}
self._ready = asyncio.Event()
self._producer = redis.Redis.from_url(url)
self._consumer = redis.Redis.from_url(url)

async def connect(self) -> None:
pass

async def disconnect(self) -> None:
await self._producer.aclose()
await self._consumer.aclose()

async def subscribe(self, channel: str) -> None:
try:
info = await self._consumer.xinfo_stream(channel)
last_id = info["last-generated-id"]
except redis.ResponseError:
last_id = "0"
self.streams[channel] = last_id
self._ready.set()

async def unsubscribe(self, channel: str) -> None:
self.streams.pop(channel, None)

async def publish(self, channel: str, message: typing.Any) -> None:
await self._producer.xadd(channel, {"message": message})

async def wait_for_messages(self) -> list[StreamMessageType]:
await self._ready.wait()
messages = None
while not messages:
messages = await self._consumer.xread(self.streams, count=1, block=100)
return messages

async def next_published(self) -> Event:
messages = await self.wait_for_messages()
stream, events = messages[0]
_msg_id, message = events[0]
self.streams[stream.decode("utf-8")] = _msg_id.decode("utf-8")
return Event(
channel=stream.decode("utf-8"),
message=message.get(b"message", b"").decode("utf-8"),
)
7 changes: 6 additions & 1 deletion broadcaster/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def _create_backend(self, url: str) -> BroadcastBackend:

return RedisBackend(url)

elif parsed_url.scheme == "redis-stream":
from broadcaster._backends.redis import RedisStreamBackend

return RedisStreamBackend(url)

elif parsed_url.scheme in ("postgres", "postgresql"):
from broadcaster._backends.postgres import PostgresBackend

Expand Down Expand Up @@ -88,7 +93,7 @@ async def subscribe(self, channel: str) -> AsyncIterator[Subscriber]:
try:
if not self._subscribers.get(channel):
await self._backend.subscribe(channel)
self._subscribers[channel] = set([queue])
self._subscribers[channel] = {queue}
else:
self._subscribers[channel].add(queue)

Expand Down
15 changes: 15 additions & 0 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ async def test_redis():
assert event.message == "hello"


@pytest.mark.asyncio
async def test_redis_stream():
async with Broadcast("redis-stream://localhost:6379") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
async with broadcast.subscribe("chatroom1") as subscriber:
await broadcast.publish("chatroom1", "hello")
event = await subscriber.get()
assert event.channel == "chatroom1"
assert event.message == "hello"


@pytest.mark.asyncio
async def test_postgres():
async with Broadcast("postgres://postgres:postgres@localhost:5432/broadcaster") as broadcast:
Expand Down