From 38b8f78f7f68b2f249907d51060af78a9074f91d Mon Sep 17 00:00:00 2001 From: tsotne Date: Tue, 24 Oct 2023 18:02:00 +0400 Subject: [PATCH 1/6] Fixed #55 - Not Close Subscribing --- broadcaster/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 0166034..4d994f5 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -88,7 +88,7 @@ async def subscribe(self, channel: str) -> AsyncIterator[Subscriber]: try: if not self._subscribers.get(channel): await self._backend.subscribe(channel) - self._subscribers[channel] = set([queue]) + self._subscribers[channel] = {queue} else: self._subscribers[channel].add(queue) From 0677f3dc0b5984a2abc7b893616e5575b506add9 Mon Sep 17 00:00:00 2001 From: tsotne Date: Tue, 24 Oct 2023 18:17:32 +0400 Subject: [PATCH 2/6] Added Redis Stream backend --- README.md | 3 ++- broadcaster/_backends/redis.py | 45 ++++++++++++++++++++++++++++++++++ broadcaster/_base.py | 5 ++++ 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d11a9ed..c76ac6b 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ Python 3.8+ * `Broadcast('memory://')` * `Broadcast("redis://localhost:6379")` +* `Broadcast("redis-stream://localhost:6379")` * `Broadcast("postgres://localhost:5432/broadcaster")` * `Broadcast("kafka://localhost:9092")` @@ -111,6 +112,6 @@ state, make sure to strictly pin your requirements to `broadcaster==0.2.0`. To be more capable we'd really want to add some additional backends, provide API support for reading recent event history from persistent stores, and provide a serialization/deserialization API... * Serialization / deserialization to support broadcasting structured data. -* Backends for Redis Streams, Apache Kafka, and RabbitMQ. +* Backends for Apache Kafka, and RabbitMQ. * Add support for `subscribe('chatroom', history=100)` for backends which provide persistence. (Redis Streams, Apache Kafka) This will allow applications to subscribe to channel updates, while also being given an initial window onto the most recent events. We *might* also want to support some basic paging operations, to allow applications to scan back in the event history. * Support for pattern subscribes in backends that support it. diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 78ed7eb..26f89af 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -47,3 +47,48 @@ async def _pubsub_listener(self) -> None: message=message["data"].decode(), ) await self._queue.put(event) + + +class RedisStreamBackend(BroadcastBackend): + def __init__(self, url: str): + self.conn_url = url.replace('redis-stream', 'redis', 1) + self.streams: typing.Dict = dict() + self._ready = asyncio.Event() + self._producer = redis.Redis.from_url(self.conn_url) + self._consumer = redis.Redis.from_url(self.conn_url) + + async def connect(self) -> None: + pass + + async def disconnect(self) -> None: + await self._producer.close() + await self._consumer.close() + + async def subscribe(self, channel: str) -> None: + info = await self._consumer.xinfo_stream(channel) + last_id = info['last-generated-id'] + self.streams[channel] = last_id + self._ready.set() + + async def unsubscribe(self, channel: str) -> None: + self.streams.pop(channel, None) + + async def publish(self, channel: str, message: typing.Any) -> None: + await self._producer.xadd(channel, {'message': message}) + + async def wait_for_messages(self) -> typing.List: + await self._ready.wait() + messages = None + while not messages: + messages = await self._consumer.xread(self.streams, count=1, block=1000) + return messages + + async def next_published(self) -> Event: + messages = await self.wait_for_messages() + stream, events = messages[0] + _msg_id, message = events[0] + self.streams[stream.decode('utf-8')] = _msg_id.decode('utf-8') + return Event( + channel=stream.decode('utf-8'), + message=message.get(b'message').decode('utf-8'), + ) \ No newline at end of file diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 4d994f5..0cdb4a7 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -38,6 +38,11 @@ def _create_backend(self, url: str) -> BroadcastBackend: return RedisBackend(url) + elif parsed_url.scheme == "redis-stream": + from broadcaster._backends.redis import RedisStreamBackend + + self._backend = RedisStreamBackend(url) + elif parsed_url.scheme in ("postgres", "postgresql"): from broadcaster._backends.postgres import PostgresBackend From 8e08b8756ab433e418843fbcb2bc113b9fb38fdd Mon Sep 17 00:00:00 2001 From: tsotne Date: Wed, 25 Oct 2023 09:53:13 +0400 Subject: [PATCH 3/6] Fixed Linting Test --- broadcaster/_backends/redis.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 26f89af..6c1fd6e 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -51,7 +51,7 @@ async def _pubsub_listener(self) -> None: class RedisStreamBackend(BroadcastBackend): def __init__(self, url: str): - self.conn_url = url.replace('redis-stream', 'redis', 1) + self.conn_url = url.replace("redis-stream", "redis", 1) self.streams: typing.Dict = dict() self._ready = asyncio.Event() self._producer = redis.Redis.from_url(self.conn_url) @@ -65,8 +65,11 @@ async def disconnect(self) -> None: await self._consumer.close() async def subscribe(self, channel: str) -> None: - info = await self._consumer.xinfo_stream(channel) - last_id = info['last-generated-id'] + try: + info = await self._consumer.xinfo_stream(channel) + last_id = info["last-generated-id"] + except aioredis.exceptions.ResponseError: + last_id = "0" self.streams[channel] = last_id self._ready.set() @@ -74,7 +77,7 @@ async def unsubscribe(self, channel: str) -> None: self.streams.pop(channel, None) async def publish(self, channel: str, message: typing.Any) -> None: - await self._producer.xadd(channel, {'message': message}) + await self._producer.xadd(channel, {"message": message}) async def wait_for_messages(self) -> typing.List: await self._ready.wait() @@ -87,8 +90,8 @@ async def next_published(self) -> Event: messages = await self.wait_for_messages() stream, events = messages[0] _msg_id, message = events[0] - self.streams[stream.decode('utf-8')] = _msg_id.decode('utf-8') + self.streams[stream.decode("utf-8")] = _msg_id.decode("utf-8") return Event( - channel=stream.decode('utf-8'), - message=message.get(b'message').decode('utf-8'), + channel=stream.decode("utf-8"), + message=message.get(b"message").decode("utf-8"), ) \ No newline at end of file From 722aaa1a4800aacd3280262d7d6cff70fc3fbe1d Mon Sep 17 00:00:00 2001 From: tsotne Date: Wed, 25 Oct 2023 11:43:05 +0400 Subject: [PATCH 4/6] Added Test For Redis Stream Backend --- tests/test_broadcast.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index b516ee2..d418508 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -55,6 +55,21 @@ async def test_redis(): assert event.message == "hello" +@pytest.mark.asyncio +async def test_redis_stream(): + async with Broadcast("redis-stream://localhost:6379") as broadcast: + async with broadcast.subscribe("chatroom") as subscriber: + await broadcast.publish("chatroom", "hello") + event = await subscriber.get() + assert event.channel == "chatroom" + assert event.message == "hello" + async with broadcast.subscribe("chatroom1") as subscriber: + await broadcast.publish("chatroom1", "hello") + event = await subscriber.get() + assert event.channel == "chatroom1" + assert event.message == "hello" + + @pytest.mark.asyncio async def test_postgres(): async with Broadcast("postgres://postgres:postgres@localhost:5432/broadcaster") as broadcast: From 9f0a84b26a51ba58bac777fb132e41f2fae7ac94 Mon Sep 17 00:00:00 2001 From: "alex.oleshkevich" Date: Wed, 10 Apr 2024 14:58:15 +0200 Subject: [PATCH 5/6] align with master --- broadcaster/_backends/redis.py | 27 ++++++++++++++++----------- broadcaster/_base.py | 2 +- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 6c1fd6e..e1f279e 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import typing @@ -49,26 +51,29 @@ async def _pubsub_listener(self) -> None: await self._queue.put(event) +StreamMessageType = typing.Tuple[bytes, typing.Tuple[typing.Tuple[bytes, typing.Dict[bytes, bytes]]]] + + class RedisStreamBackend(BroadcastBackend): def __init__(self, url: str): - self.conn_url = url.replace("redis-stream", "redis", 1) - self.streams: typing.Dict = dict() + url = url.replace("redis-stream", "redis", 1) + self.streams: dict[str, str] = {} self._ready = asyncio.Event() - self._producer = redis.Redis.from_url(self.conn_url) - self._consumer = redis.Redis.from_url(self.conn_url) + self._producer = redis.Redis.from_url(url) + self._consumer = redis.Redis.from_url(url) async def connect(self) -> None: pass async def disconnect(self) -> None: - await self._producer.close() - await self._consumer.close() + await self._producer.aclose() + await self._consumer.aclose() async def subscribe(self, channel: str) -> None: try: info = await self._consumer.xinfo_stream(channel) last_id = info["last-generated-id"] - except aioredis.exceptions.ResponseError: + except redis.ResponseError: last_id = "0" self.streams[channel] = last_id self._ready.set() @@ -79,11 +84,11 @@ async def unsubscribe(self, channel: str) -> None: async def publish(self, channel: str, message: typing.Any) -> None: await self._producer.xadd(channel, {"message": message}) - async def wait_for_messages(self) -> typing.List: + async def wait_for_messages(self) -> list[StreamMessageType]: await self._ready.wait() messages = None while not messages: - messages = await self._consumer.xread(self.streams, count=1, block=1000) + messages = await self._consumer.xread(self.streams, count=1, block=100) return messages async def next_published(self) -> Event: @@ -93,5 +98,5 @@ async def next_published(self) -> Event: self.streams[stream.decode("utf-8")] = _msg_id.decode("utf-8") return Event( channel=stream.decode("utf-8"), - message=message.get(b"message").decode("utf-8"), - ) \ No newline at end of file + message=message.get(b"message", b"").decode("utf-8"), + ) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 0cdb4a7..4650e0a 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -41,7 +41,7 @@ def _create_backend(self, url: str) -> BroadcastBackend: elif parsed_url.scheme == "redis-stream": from broadcaster._backends.redis import RedisStreamBackend - self._backend = RedisStreamBackend(url) + return RedisStreamBackend(url) elif parsed_url.scheme in ("postgres", "postgresql"): from broadcaster._backends.postgres import PostgresBackend From e9ec0e18b57df242e9c4e9e200d7505a9b3a1180 Mon Sep 17 00:00:00 2001 From: "alex.oleshkevich" Date: Mon, 22 Apr 2024 17:20:15 +0200 Subject: [PATCH 6/6] update docs --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c76ac6b..cb6cfc2 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Broadcaster helps you develop realtime streaming functionality by providing a simple broadcast API onto a number of different backend services. -It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing. +It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing. WebSockets Demo @@ -98,9 +98,10 @@ and pass it to the `broadcaster` via `backend` argument. from broadcaster import Broadcaster, BroadcastBackend class MyBackend(BroadcastBackend): - ... broadcaster = Broadcaster(backend=MyBackend()) +``` + ## Where next? @@ -112,6 +113,6 @@ state, make sure to strictly pin your requirements to `broadcaster==0.2.0`. To be more capable we'd really want to add some additional backends, provide API support for reading recent event history from persistent stores, and provide a serialization/deserialization API... * Serialization / deserialization to support broadcasting structured data. -* Backends for Apache Kafka, and RabbitMQ. +* A backend for RabbitMQ. * Add support for `subscribe('chatroom', history=100)` for backends which provide persistence. (Redis Streams, Apache Kafka) This will allow applications to subscribe to channel updates, while also being given an initial window onto the most recent events. We *might* also want to support some basic paging operations, to allow applications to scan back in the event history. * Support for pattern subscribes in backends that support it.