From 9e012aa054b3afc78c286eb00d90c70c3f610970 Mon Sep 17 00:00:00 2001 From: MisLink Date: Fri, 28 Aug 2020 02:55:48 +0800 Subject: [PATCH] Replace asyncio_redis to aioredis Signed-off-by: MisLink --- broadcaster/_backends/redis.py | 30 ++++++++++++++++++------------ broadcaster/_base.py | 17 +++++++++++++---- setup.py | 4 ++-- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 096a033..66d30ad 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -1,34 +1,40 @@ -import asyncio_redis import typing -from urllib.parse import urlparse + +import aioredis +from aioredis.pubsub import Receiver + from .base import BroadcastBackend from .._base import Event class RedisBackend(BroadcastBackend): def __init__(self, url: str): - parsed_url = urlparse(url) - self._host = parsed_url.hostname or "localhost" - self._port = parsed_url.port or 6379 + self._url = url async def connect(self) -> None: - self._pub_conn = await asyncio_redis.Connection.create(self._host, self._port) - self._sub_conn = await asyncio_redis.Connection.create(self._host, self._port) - self._subscriber = await self._sub_conn.start_subscribe() + self._pub_conn = await aioredis.create_redis(self._url) + self._sub_conn = await aioredis.create_redis(self._url) + self._receiver = Receiver() async def disconnect(self) -> None: + self._receiver.stop() self._pub_conn.close() self._sub_conn.close() + await self._pub_conn.wait_closed() + await self._sub_conn.wait_closed() async def subscribe(self, channel: str) -> None: - await self._subscriber.subscribe([channel]) + await self._sub_conn.subscribe(self._receiver.channel(channel)) async def unsubscribe(self, channel: str) -> None: - await self._subscriber.unsubscribe([channel]) + await self._sub_conn.unsubscribe(channel) async def publish(self, channel: str, message: typing.Any) -> None: await self._pub_conn.publish(channel, message) async def next_published(self) -> Event: - message = await self._subscriber.next_published() - return Event(channel=message.channel, message=message.value) + message = await self._receiver.get(encoding="utf8") + if message is None: + raise aioredis.ChannelClosedError() + channel, message = message + return Event(channel=channel.name.decode("utf8"), message=message) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 44ec030..f801f6e 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -3,6 +3,8 @@ from contextlib import asynccontextmanager from urllib.parse import urlparse +import aioredis + class Event: def __init__(self, channel, message): @@ -63,10 +65,17 @@ async def disconnect(self) -> None: await self._backend.disconnect() async def _listener(self) -> None: - while True: - event = await self._backend.next_published() - for queue in list(self._subscribers.get(event.channel, [])): - await queue.put(event) + try: + while True: + event = await self._backend.next_published() + for queue in list(self._subscribers.get(event.channel, [])): + await queue.put(event) + except asyncio.CancelledError: + raise + # for aioredis + except aioredis.ChannelClosedError: + pass + async def publish(self, channel: str, message: typing.Any) -> None: await self._backend.publish(channel, message) diff --git a/setup.py b/setup.py index 75229e5..187042e 100644 --- a/setup.py +++ b/setup.py @@ -47,9 +47,9 @@ def get_packages(package): author_email="tom@tomchristie.com", packages=get_packages("broadcaster"), extras_require={ - "redis": ["asyncio-redis"], + "redis": ["aioredis"], "postgres": ["asyncpg"], - "kafka": ["aiokafka"] + "kafka": ["aiokafka"], }, classifiers=[ "Development Status :: 3 - Alpha",