diff --git a/README.md b/README.md index 0bff43b..eb547ab 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Broadcaster helps you develop realtime streaming functionality by providing a simple broadcast API onto a number of different backend services. -It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing. +It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing. WebSockets Demo @@ -83,6 +83,7 @@ Python 3.8+ * `Broadcast('memory://')` * `Broadcast("redis://localhost:6379")` +* `Broadcast("redis-stream://localhost:6379")` * `Broadcast("postgres://localhost:5432/broadcaster")` * `Broadcast("kafka://localhost:9092")` @@ -97,7 +98,6 @@ and pass it to the `broadcaster` via `backend` argument. from broadcaster import Broadcaster, BroadcastBackend class MyBackend(BroadcastBackend): - ... broadcaster = Broadcaster(backend=MyBackend()) ``` @@ -112,6 +112,6 @@ state, make sure to strictly pin your requirements to `broadcaster==0.2.0`. To be more capable we'd really want to add some additional backends, provide API support for reading recent event history from persistent stores, and provide a serialization/deserialization API... * Serialization / deserialization to support broadcasting structured data. -* Backends for Redis Streams, Apache Kafka, and RabbitMQ. +* A backend for RabbitMQ. * Add support for `subscribe('chatroom', history=100)` for backends which provide persistence. (Redis Streams, Apache Kafka) This will allow applications to subscribe to channel updates, while also being given an initial window onto the most recent events. We *might* also want to support some basic paging operations, to allow applications to scan back in the event history. * Support for pattern subscribes in backends that support it. diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 78ed7eb..e1f279e 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import typing @@ -47,3 +49,54 @@ async def _pubsub_listener(self) -> None: message=message["data"].decode(), ) await self._queue.put(event) + + +StreamMessageType = typing.Tuple[bytes, typing.Tuple[typing.Tuple[bytes, typing.Dict[bytes, bytes]]]] + + +class RedisStreamBackend(BroadcastBackend): + def __init__(self, url: str): + url = url.replace("redis-stream", "redis", 1) + self.streams: dict[str, str] = {} + self._ready = asyncio.Event() + self._producer = redis.Redis.from_url(url) + self._consumer = redis.Redis.from_url(url) + + async def connect(self) -> None: + pass + + async def disconnect(self) -> None: + await self._producer.aclose() + await self._consumer.aclose() + + async def subscribe(self, channel: str) -> None: + try: + info = await self._consumer.xinfo_stream(channel) + last_id = info["last-generated-id"] + except redis.ResponseError: + last_id = "0" + self.streams[channel] = last_id + self._ready.set() + + async def unsubscribe(self, channel: str) -> None: + self.streams.pop(channel, None) + + async def publish(self, channel: str, message: typing.Any) -> None: + await self._producer.xadd(channel, {"message": message}) + + async def wait_for_messages(self) -> list[StreamMessageType]: + await self._ready.wait() + messages = None + while not messages: + messages = await self._consumer.xread(self.streams, count=1, block=100) + return messages + + async def next_published(self) -> Event: + messages = await self.wait_for_messages() + stream, events = messages[0] + _msg_id, message = events[0] + self.streams[stream.decode("utf-8")] = _msg_id.decode("utf-8") + return Event( + channel=stream.decode("utf-8"), + message=message.get(b"message", b"").decode("utf-8"), + ) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 0166034..4650e0a 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -38,6 +38,11 @@ def _create_backend(self, url: str) -> BroadcastBackend: return RedisBackend(url) + elif parsed_url.scheme == "redis-stream": + from broadcaster._backends.redis import RedisStreamBackend + + return RedisStreamBackend(url) + elif parsed_url.scheme in ("postgres", "postgresql"): from broadcaster._backends.postgres import PostgresBackend @@ -88,7 +93,7 @@ async def subscribe(self, channel: str) -> AsyncIterator[Subscriber]: try: if not self._subscribers.get(channel): await self._backend.subscribe(channel) - self._subscribers[channel] = set([queue]) + self._subscribers[channel] = {queue} else: self._subscribers[channel].add(queue) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index b516ee2..d418508 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -55,6 +55,21 @@ async def test_redis(): assert event.message == "hello" +@pytest.mark.asyncio +async def test_redis_stream(): + async with Broadcast("redis-stream://localhost:6379") as broadcast: + async with broadcast.subscribe("chatroom") as subscriber: + await broadcast.publish("chatroom", "hello") + event = await subscriber.get() + assert event.channel == "chatroom" + assert event.message == "hello" + async with broadcast.subscribe("chatroom1") as subscriber: + await broadcast.publish("chatroom1", "hello") + event = await subscriber.get() + assert event.channel == "chatroom1" + assert event.message == "hello" + + @pytest.mark.asyncio async def test_postgres(): async with Broadcast("postgres://postgres:postgres@localhost:5432/broadcaster") as broadcast: