Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:

strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11-dev"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]

services:
zookeeper:
Expand Down
2 changes: 1 addition & 1 deletion broadcaster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ._base import Broadcast, Event
from ._backends.base import BroadcastBackend
from ._base import Broadcast, Event

__version__ = "0.2.0"
__all__ = ["Broadcast", "Event", "BroadcastBackend"]
4 changes: 3 additions & 1 deletion broadcaster/_backends/kafka.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import typing
from urllib.parse import urlparse

Expand All @@ -10,7 +12,7 @@
class KafkaBackend(BroadcastBackend):
def __init__(self, url: str):
self._servers = [urlparse(url).netloc]
self._consumer_channels: typing.Set = set()
self._consumer_channels: set[str] = set()

async def connect(self) -> None:
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers)
Expand Down
6 changes: 4 additions & 2 deletions broadcaster/_backends/memory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import asyncio
import typing

Expand All @@ -7,10 +9,10 @@

class MemoryBackend(BroadcastBackend):
def __init__(self, url: str):
self._subscribed: typing.Set = set()
self._subscribed: set[str] = set()

async def connect(self) -> None:
self._published: asyncio.Queue = asyncio.Queue()
self._published: asyncio.Queue[Event] = asyncio.Queue()

async def disconnect(self) -> None:
pass
Expand Down
2 changes: 1 addition & 1 deletion broadcaster/_backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, url: str):

async def connect(self) -> None:
self._conn = await asyncpg.connect(self._url)
self._listen_queue: asyncio.Queue = asyncio.Queue()
self._listen_queue: asyncio.Queue[Event] = asyncio.Queue()

async def disconnect(self) -> None:
await self._conn.close()
Expand Down
36 changes: 12 additions & 24 deletions broadcaster/_base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
from __future__ import annotations

import asyncio
from contextlib import asynccontextmanager
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
AsyncIterator,
Dict,
Optional,
cast,
)
from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterator, cast
from urllib.parse import urlparse

if TYPE_CHECKING: # pragma: no cover
Expand All @@ -21,11 +15,7 @@ def __init__(self, channel: str, message: str) -> None:
self.message = message

def __eq__(self, other: object) -> bool:
return (
isinstance(other, Event)
and self.channel == other.channel
and self.message == other.message
)
return isinstance(other, Event) and self.channel == other.channel and self.message == other.message

def __repr__(self) -> str:
return f"Event(channel={self.channel!r}, message={self.message!r})"
Expand All @@ -36,14 +26,12 @@ class Unsubscribed(Exception):


class Broadcast:
def __init__(
self, url: Optional[str] = None, *, backend: Optional["BroadcastBackend"] = None
) -> None:
def __init__(self, url: str | None = None, *, backend: BroadcastBackend | None = None) -> None:
assert url or backend, "Either `url` or `backend` must be provided."
self._backend = backend or self._create_backend(cast(str, url))
self._subscribers: Dict[str, Any] = {}
self._subscribers: dict[str, set[asyncio.Queue[Event | None]]] = {}

def _create_backend(self, url: str) -> "BroadcastBackend":
def _create_backend(self, url: str) -> BroadcastBackend:
parsed_url = urlparse(url)
if parsed_url.scheme in ("redis", "rediss"):
from broadcaster._backends.redis import RedisBackend
Expand All @@ -66,7 +54,7 @@ def _create_backend(self, url: str) -> "BroadcastBackend":
return MemoryBackend(url)
raise ValueError(f"Unsupported backend: {parsed_url.scheme}")

async def __aenter__(self) -> "Broadcast":
async def __aenter__(self) -> Broadcast:
await self.connect()
return self

Expand Down Expand Up @@ -94,8 +82,8 @@ async def publish(self, channel: str, message: Any) -> None:
await self._backend.publish(channel, message)

@asynccontextmanager
async def subscribe(self, channel: str) -> AsyncIterator["Subscriber"]:
queue: asyncio.Queue = asyncio.Queue()
async def subscribe(self, channel: str) -> AsyncIterator[Subscriber]:
queue: asyncio.Queue[Event | None] = asyncio.Queue()

try:
if not self._subscribers.get(channel):
Expand All @@ -114,10 +102,10 @@ async def subscribe(self, channel: str) -> AsyncIterator["Subscriber"]:


class Subscriber:
def __init__(self, queue: asyncio.Queue) -> None:
def __init__(self, queue: asyncio.Queue[Event | None]) -> None:
self._queue = queue

async def __aiter__(self) -> Optional[AsyncGenerator]:
async def __aiter__(self) -> AsyncGenerator[Event | None, None] | None:
try:
while True:
yield await self.get()
Expand Down
10 changes: 7 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"anyio>=3.4.0,<5",
Expand All @@ -48,14 +49,17 @@ include = [
]

[tool.ruff]
ignore = []
line-length = 120
select = ["E","F","W"]

[tool.ruff.isort]
[tool.ruff.lint]
select = ["E", "F", "I", "FA", "UP"]

[tool.ruff.lint.isort]
combine-as-imports = true

[tool.mypy]
strict = true
python_version = "3.8"
disallow_untyped_defs = true
ignore_missing_imports = true

Expand Down
19 changes: 9 additions & 10 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
-e .[redis,postgres,kafka]

# Documentation
mkdocs==1.3.1
mkdocs==1.5.3
mkdocs-material==9.5.12
mkautodoc==0.2.0
mkdocs-material==8.4.0

# Packaging
build==0.10.0
twine==4.0.1
build==1.1.1
twine==5.0.0

# Tests & Linting
ruff==0.0.277
black==22.6.0
coverage==6.4.4
mypy==0.971
pytest==7.1.2
pytest-asyncio==0.19.0
ruff==0.3.5
coverage==7.4.3
mypy==1.8.0
pytest==8.0.2
pytest-asyncio==0.23.6
4 changes: 2 additions & 2 deletions scripts/check
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ export SOURCE_FILES="broadcaster tests"

set -x

${PREFIX}black --check --diff --target-version=py37 $SOURCE_FILES
${PREFIX}ruff check $SOURCE_FILES
${PREFIX}ruff format --check --diff $SOURCE_FILES
${PREFIX}mypy $SOURCE_FILES
${PREFIX}ruff check $SOURCE_FILES
6 changes: 3 additions & 3 deletions scripts/lint
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/bin/sh -e

export PREFIX=""
if [ -d 'venv' ]; then
if [ -d 'venv' ] ; then
export PREFIX="venv/bin/"
fi
export SOURCE_FILES="broadcaster tests"

set -x

${PREFIX}ruff --fix $SOURCE_FILES
${PREFIX}black --target-version=py37 $SOURCE_FILES
${PREFIX}ruff format $SOURCE_FILES
${PREFIX}ruff check --fix $SOURCE_FILES
19 changes: 9 additions & 10 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import pytest
import typing
from __future__ import annotations

import asyncio
import typing

import pytest

from broadcaster import Broadcast, BroadcastBackend, Event


class CustomBackend(BroadcastBackend):
def __init__(self, url: str):
self._subscribed: typing.Set = set()
self._subscribed: set[str] = set()

async def connect(self) -> None:
self._published: asyncio.Queue = asyncio.Queue()
self._published: asyncio.Queue[Event] = asyncio.Queue()

async def disconnect(self) -> None:
pass
Expand Down Expand Up @@ -54,9 +57,7 @@ async def test_redis():

@pytest.mark.asyncio
async def test_postgres():
async with Broadcast(
"postgres://postgres:postgres@localhost:5432/broadcaster"
) as broadcast:
async with Broadcast("postgres://postgres:postgres@localhost:5432/broadcaster") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
Expand Down Expand Up @@ -95,7 +96,5 @@ async def test_unknown_backend():

@pytest.mark.asyncio
async def test_needs_url_or_backend():
with pytest.raises(
AssertionError, match="Either `url` or `backend` must be provided."
):
with pytest.raises(AssertionError, match="Either `url` or `backend` must be provided."):
Broadcast()
1 change: 1 addition & 0 deletions tests/test_unsubscribe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest

from broadcaster import Broadcast


Expand Down