diff --git a/README.md b/README.md index 2283368..77b6dea 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ The HTML template for the front end [is available here](https://github.com/encod * `Broadcast('memory://')` * `Broadcast("redis://localhost:6379")` +* `Broadcast("redis-stream://localhost:6379")` * `Broadcast("postgres://localhost:5432/hostedapi")` ## Where next? @@ -87,6 +88,6 @@ state, make sure to strictly pin your requirements to `broadcaster==0.1.0`. To be more capable we'd really want to add some additional backends, provide API support for reading recent event history from persistent stores, and provide a serialization/deserialization API... * Serialization / deserialization to support broadcasting structured data. -* Backends for Redis Streams, Apache Kafka, and RabbitMQ. +* Backends for Apache Kafka, and RabbitMQ. * Add support for `subscribe('chatroom', history=100)` for backends which provide persistence. (Redis Streams, Apache Kafka) This will allow applications to subscribe to channel updates, while also being given an initial window onto the most recent events. We *might* also want to support some basic paging operations, to allow applications to scan back in the event history. * Support for pattern subscribes in backends that support it. diff --git a/broadcaster/_backends/redis_stream.py b/broadcaster/_backends/redis_stream.py new file mode 100644 index 0000000..9e27ff5 --- /dev/null +++ b/broadcaster/_backends/redis_stream.py @@ -0,0 +1,43 @@ +import aioredis +import asyncio +import typing + +from .base import BroadcastBackend +from .._base import Event + + +class RedisStreamBackend(BroadcastBackend): + def __init__(self, url: str): + self.conn_url = url.replace('redis-stream', 'redis', 1) + self.streams: typing.Set = set() + + async def connect(self) -> None: + loop = asyncio.get_event_loop() + self._producer = await aioredis.create_redis(self.conn_url, loop=loop) + self._consumer = await aioredis.create_redis(self.conn_url, loop=loop) + + async def disconnect(self) -> None: + self._producer.close() + self._consumer.close() + + async def subscribe(self, channel: str) -> None: + self.streams.add(channel) + + async def unsubscribe(self, channel: str) -> None: + await self.streams.discard(channel) + + async def publish(self, channel: str, message: typing.Any) -> None: + await self._producer.xadd(channel, {'message': message}) + + async def _wait_for_streams(self) -> None: + while not self.streams: + await asyncio.sleep(1) + + async def next_published(self) -> Event: + await self._wait_for_streams() + messages = await self._consumer.xread(list(self.streams)) + stream, _msg_id, message = messages[0] + return Event( + channel=stream.decode('utf-8'), + message=message.get(b'message').decode('utf-8'), + ) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 3e73df4..833a630 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -32,6 +32,10 @@ def __init__(self, url: str): from ._backends.redis import RedisBackend self._backend = RedisBackend(url) + elif parsed_url.scheme == 'redis-stream': + from ._backends.redis_stream import RedisStreamBackend + self._backend = RedisStreamBackend(url) + elif parsed_url.scheme in ('postgres', 'postgresql'): from ._backends.postgres import PostgresBackend self._backend = PostgresBackend(url) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 6bac3db..6d14c9e 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -23,6 +23,16 @@ async def test_redis(): assert event.message == 'hello' +@pytest.mark.asyncio +async def test_redis_stream(): + async with Broadcast('redis-stream://localhost:6379') as broadcast: + async with broadcast.subscribe('chatroom') as subscriber: + await broadcast.publish('chatroom', 'hello') + event = await subscriber.get() + assert event.channel == 'chatroom' + assert event.message == 'hello' + + @pytest.mark.asyncio async def test_postgres(): async with Broadcast('postgres://localhost:5432/hostedapi') as broadcast: