From 7cdb94c1a5045a863e32aa0a4ba670efd85cf557 Mon Sep 17 00:00:00 2001 From: caiquilipe Date: Mon, 26 Feb 2024 18:06:26 -0300 Subject: [PATCH 1/5] feat: implemented redis-py --- broadcaster/_backends/redis.py | 36 ++++++++++++++++++++++------------ broadcaster/_base.py | 23 ++++++++++++---------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 2c4aeba..2ffec45 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -1,7 +1,8 @@ -import typing +from typing import Any from urllib.parse import urlparse -import asyncio_redis +import redis.asyncio as redis +from redis.asyncio.client import PubSub from .._base import Event from .base import BroadcastBackend @@ -14,25 +15,34 @@ def __init__(self, url: str): self._port = parsed_url.port or 6379 self._password = parsed_url.password or None + self._sub_conn: PubSub | None = None + self._pub_conn: PubSub | None = None + async def connect(self) -> None: kwargs = {"host": self._host, "port": self._port, "password": self._password} - self._pub_conn = await asyncio_redis.Connection.create(**kwargs) - self._sub_conn = await asyncio_redis.Connection.create(**kwargs) - self._subscriber = await self._sub_conn.start_subscribe() + print(kwargs) + self._pub_conn = redis.Redis(**kwargs).pubsub() + self._sub_conn = redis.Redis(**kwargs).pubsub() async def disconnect(self) -> None: - self._pub_conn.close() - self._sub_conn.close() + await self._pub_conn.close() + await self._sub_conn.close() async def subscribe(self, channel: str) -> None: - await self._subscriber.subscribe([channel]) + await self._sub_conn.subscribe(channel) async def unsubscribe(self, channel: str) -> None: - await self._subscriber.unsubscribe([channel]) + await self._sub_conn.unsubscribe(channel) - async def publish(self, channel: str, message: typing.Any) -> None: - await self._pub_conn.publish(channel, message) + async def publish(self, channel: str, message: Any) -> None: + await self._pub_conn.execute_command("PUBLISH", channel, message) async def next_published(self) -> Event: - message = await self._subscriber.next_published() - return Event(channel=message.channel, message=message.value) + message = None + while not message: + message = await self._sub_conn.get_message(ignore_subscribe_messages=True, timeout=None) + event = Event( + channel=message["channel"].decode(), + message=message["data"].decode(), + ) + return event diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 4de1417..b570188 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -1,6 +1,6 @@ import asyncio from contextlib import asynccontextmanager -from typing import Any, AsyncGenerator, AsyncIterator, Dict, Optional +from typing import Any, AsyncGenerator, Optional, AsyncIterator from urllib.parse import urlparse @@ -26,13 +26,13 @@ class Unsubscribed(Exception): class Broadcast: def __init__(self, url: str): - from broadcaster._backends.base import BroadcastBackend + from ._backends.base import BroadcastBackend parsed_url = urlparse(url) self._backend: BroadcastBackend - self._subscribers: Dict[str, Any] = {} + self._subscribers: dict[str, set[asyncio.Queue]] = {} if parsed_url.scheme in ("redis", "rediss"): - from broadcaster._backends.redis import RedisBackend + from ._backends.redis import RedisBackend self._backend = RedisBackend(url) @@ -51,6 +51,8 @@ def __init__(self, url: str): self._backend = MemoryBackend(url) + self._listener_task: asyncio.Task | None = None + async def __aenter__(self) -> "Broadcast": await self.connect() return self @@ -60,13 +62,13 @@ async def __aexit__(self, *args: Any, **kwargs: Any) -> None: async def connect(self) -> None: await self._backend.connect() - self._listener_task = asyncio.create_task(self._listener()) async def disconnect(self) -> None: - if self._listener_task.done(): - self._listener_task.result() - else: - self._listener_task.cancel() + if self._listener_task: + if self._listener_task.done(): + self._listener_task.result() + else: + self._listener_task.cancel() await self._backend.disconnect() async def _listener(self) -> None: @@ -81,10 +83,11 @@ async def publish(self, channel: str, message: Any) -> None: @asynccontextmanager async def subscribe(self, channel: str) -> AsyncIterator["Subscriber"]: queue: asyncio.Queue = asyncio.Queue() - try: if not self._subscribers.get(channel): await self._backend.subscribe(channel) + if not self._listener_task: + self._listener_task = asyncio.create_task(self._listener()) self._subscribers[channel] = set([queue]) else: self._subscribers[channel].add(queue) From 07b6c89a525a8818c9fbc95e61a8c80d0880abf1 Mon Sep 17 00:00:00 2001 From: caiquilipe Date: Mon, 26 Feb 2024 20:09:14 -0300 Subject: [PATCH 2/5] feat: added ssl --- broadcaster/_backends/redis.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 2ffec45..83dd71d 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -14,15 +14,15 @@ def __init__(self, url: str): self._host = parsed_url.hostname or "localhost" self._port = parsed_url.port or 6379 self._password = parsed_url.password or None - + self._ssl = parsed_url.scheme == "rediss" + self.kwargs = {"host": self._host, "port": self._port, "password": self._password, "ssl": self._ssl} + self._sub_conn: PubSub | None = None self._pub_conn: PubSub | None = None async def connect(self) -> None: - kwargs = {"host": self._host, "port": self._port, "password": self._password} - print(kwargs) - self._pub_conn = redis.Redis(**kwargs).pubsub() - self._sub_conn = redis.Redis(**kwargs).pubsub() + self._pub_conn = redis.Redis(**self.kwargs).pubsub() + self._sub_conn = redis.Redis(**self.kwargs).pubsub() async def disconnect(self) -> None: await self._pub_conn.close() From 973055d15e952b38b2fcfb43b2a2071f91a04c04 Mon Sep 17 00:00:00 2001 From: caiquilipe Date: Mon, 26 Feb 2024 22:33:34 -0300 Subject: [PATCH 3/5] docs: refformated with black docs: refformated with black --- broadcaster/_backends/redis.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 83dd71d..14061cf 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -15,8 +15,13 @@ def __init__(self, url: str): self._port = parsed_url.port or 6379 self._password = parsed_url.password or None self._ssl = parsed_url.scheme == "rediss" - self.kwargs = {"host": self._host, "port": self._port, "password": self._password, "ssl": self._ssl} - + self.kwargs = { + "host": self._host, + "port": self._port, + "password": self._password, + "ssl": self._ssl, + } + self._sub_conn: PubSub | None = None self._pub_conn: PubSub | None = None @@ -40,7 +45,9 @@ async def publish(self, channel: str, message: Any) -> None: async def next_published(self) -> Event: message = None while not message: - message = await self._sub_conn.get_message(ignore_subscribe_messages=True, timeout=None) + message = await self._sub_conn.get_message( + ignore_subscribe_messages=True, timeout=None + ) event = Event( channel=message["channel"].decode(), message=message["data"].decode(), From d43d4736b206dcfbc2654c98bd30e034ab211a9e Mon Sep 17 00:00:00 2001 From: caiquilipe Date: Wed, 28 Feb 2024 17:43:05 -0300 Subject: [PATCH 4/5] fix: added retry case ConnectionError --- broadcaster/_backends/redis.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 14061cf..86fc66c 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -6,6 +6,7 @@ from .._base import Event from .base import BroadcastBackend +import asyncio class RedisBackend(BroadcastBackend): @@ -40,7 +41,12 @@ async def unsubscribe(self, channel: str) -> None: await self._sub_conn.unsubscribe(channel) async def publish(self, channel: str, message: Any) -> None: - await self._pub_conn.execute_command("PUBLISH", channel, message) + try: + await self._pub_conn.execute_command("PUBLISH", channel, message) + except redis.ConnectionError: + await asyncio.sleep(1) + self._pub_conn = redis.Redis(**self.kwargs).pubsub() + await self.publish(channel, message) async def next_published(self) -> Event: message = None From ce919903cfba41e6042cd438a049fc7ee64b7fa2 Mon Sep 17 00:00:00 2001 From: caiquilipe Date: Thu, 29 Feb 2024 10:59:03 -0300 Subject: [PATCH 5/5] feat: added retry on timeout --- broadcaster/_backends/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 86fc66c..7074d35 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -43,7 +43,7 @@ async def unsubscribe(self, channel: str) -> None: async def publish(self, channel: str, message: Any) -> None: try: await self._pub_conn.execute_command("PUBLISH", channel, message) - except redis.ConnectionError: + except (redis.ConnectionError, redis.TimeoutError): await asyncio.sleep(1) self._pub_conn = redis.Redis(**self.kwargs).pubsub() await self.publish(channel, message)