diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 2c4aeba..78ed7eb 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -1,7 +1,7 @@ +import asyncio import typing -from urllib.parse import urlparse -import asyncio_redis +from redis import asyncio as redis from .._base import Event from .base import BroadcastBackend @@ -9,30 +9,41 @@ class RedisBackend(BroadcastBackend): def __init__(self, url: str): - parsed_url = urlparse(url) - self._host = parsed_url.hostname or "localhost" - self._port = parsed_url.port or 6379 - self._password = parsed_url.password or None + self._conn = redis.Redis.from_url(url) + self._pubsub = self._conn.pubsub() + self._ready = asyncio.Event() + self._queue: asyncio.Queue[Event] = asyncio.Queue() + self._listener = asyncio.create_task(self._pubsub_listener()) async def connect(self) -> None: - kwargs = {"host": self._host, "port": self._port, "password": self._password} - self._pub_conn = await asyncio_redis.Connection.create(**kwargs) - self._sub_conn = await asyncio_redis.Connection.create(**kwargs) - self._subscriber = await self._sub_conn.start_subscribe() + await self._pubsub.connect() async def disconnect(self) -> None: - self._pub_conn.close() - self._sub_conn.close() + await self._pubsub.aclose() + await self._conn.aclose() + self._listener.cancel() async def subscribe(self, channel: str) -> None: - await self._subscriber.subscribe([channel]) + self._ready.set() + await self._pubsub.subscribe(channel) async def unsubscribe(self, channel: str) -> None: - await self._subscriber.unsubscribe([channel]) + await self._pubsub.unsubscribe(channel) async def publish(self, channel: str, message: typing.Any) -> None: - await self._pub_conn.publish(channel, message) + await self._conn.publish(channel, message) async def next_published(self) -> Event: - message = await self._subscriber.next_published() - return Event(channel=message.channel, message=message.value) + return await self._queue.get() + + async def _pubsub_listener(self) -> None: + # redis-py does not listen to the pubsub connection if there are no channels subscribed + # so we need to wait until the first channel is subscribed to start listening + await self._ready.wait() + async for message in self._pubsub.listen(): + if message["type"] == "message": + event = Event( + channel=message["channel"].decode(), + message=message["data"].decode(), + ) + await self._queue.put(event) diff --git a/pyproject.toml b/pyproject.toml index ef59da4..10f7cc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ ] [project.optional-dependencies] -redis = ["asyncio-redis"] +redis = ["redis"] postgres = ["asyncpg"] kafka = ["aiokafka"] test = ["pytest", "pytest-asyncio"]