diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml new file mode 100644 index 0000000..458bcad --- /dev/null +++ b/.github/workflows/test-suite.yml @@ -0,0 +1,69 @@ +--- +name: Test Suite + +on: + push: + branches: ["master"] + pull_request: + branches: ["master"] + +jobs: + tests: + name: "Python ${{ matrix.python-version }}" + runs-on: "ubuntu-latest" + + strategy: + matrix: + python-version: ["3.7", "3.8", "3.9"] + + services: + zookeeper: + image: confluentinc/cp-zookeeper + ports: + - 32181:32181 + env: + ZOOKEEPER_CLIENT_PORT: 32181 + ALLOW_ANONYMOUS_LOGIN: yes + options: --hostname zookeeper + kafka: + image: confluentinc/cp-kafka + ports: + - 9092:9092 + - 29092:29092 + env: + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:29092,PLAINTEXT://localhost:9092" + KAFKA_BROKER_ID: 1 + ALLOW_PLAINTEXT_LISTENER: yes + options: --hostname kafka + redis: + image: redis:alpine + ports: + - 6379:6379 + postgres: + image: postgres:12 + env: + POSTGRES_DB: broadcaster + POSTGRES_PASSWORD: postgres + POSTGRES_HOST_AUTH_METHOD: trust + POSTGRES_USER: postgres + ports: + - 5432:5432 + + steps: + - uses: "actions/checkout@v2" + - uses: "actions/setup-python@v2" + with: + python-version: "${{ matrix.python-version }}" + - name: "Install dependencies" + run: "scripts/install" + - name: "Run linting checks" + run: "scripts/check" + - name: "Build package & docs" + run: "scripts/build" + - name: "Run tests" + run: "scripts/test" + - name: "Enforce coverage" + run: "scripts/coverage" \ No newline at end of file diff --git a/broadcaster/__init__.py b/broadcaster/__init__.py index 49a4d37..edc56d6 100644 --- a/broadcaster/__init__.py +++ b/broadcaster/__init__.py @@ -1,3 +1,4 @@ from ._base import Broadcast, Event __version__ = "0.2.0" +__all__ = ["Broadcast", "Event"] diff --git a/broadcaster/_backends/base.py b/broadcaster/_backends/base.py index b2cf506..1c65b86 100644 --- a/broadcaster/_backends/base.py +++ b/broadcaster/_backends/base.py @@ -1,8 +1,10 @@ -import typing +from typing import Any + +from .._base import Event class BroadcastBackend: - def __init__(self, url): + def __init__(self, url: str) -> None: raise NotImplementedError() async def connect(self) -> None: @@ -17,8 +19,8 @@ async def subscribe(self, group: str) -> None: async def unsubscribe(self, group: str) -> None: raise NotImplementedError() - async def publish(self, channel: str, message: typing.Any) -> None: + async def publish(self, channel: str, message: Any) -> None: raise NotImplementedError() - async def next_published(self) -> typing.Tuple[str, typing.Any]: + async def next_published(self) -> Event: raise NotImplementedError() diff --git a/broadcaster/_backends/memory.py b/broadcaster/_backends/memory.py index 93ac36a..5a9fa53 100644 --- a/broadcaster/_backends/memory.py +++ b/broadcaster/_backends/memory.py @@ -1,7 +1,8 @@ import asyncio import typing -from .base import BroadcastBackend + from .._base import Event +from .base import BroadcastBackend class MemoryBackend(BroadcastBackend): diff --git a/broadcaster/_backends/postgres.py b/broadcaster/_backends/postgres.py index f36ad44..47ef4f6 100644 --- a/broadcaster/_backends/postgres.py +++ b/broadcaster/_backends/postgres.py @@ -1,7 +1,10 @@ import asyncio +from typing import Any + import asyncpg -from .base import BroadcastBackend + from .._base import Event +from .base import BroadcastBackend class PostgresBackend(BroadcastBackend): @@ -24,7 +27,7 @@ async def unsubscribe(self, channel: str) -> None: async def publish(self, channel: str, message: str) -> None: await self._conn.execute("SELECT pg_notify($1, $2);", channel, message) - def _listener(self, *args) -> None: + def _listener(self, *args: Any) -> None: connection, pid, channel, payload = args event = Event(channel=channel, message=payload) self._listen_queue.put_nowait(event) diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 096a033..b545bad 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -1,8 +1,10 @@ -import asyncio_redis import typing from urllib.parse import urlparse -from .base import BroadcastBackend + +import asyncio_redis + from .._base import Event +from .base import BroadcastBackend class RedisBackend(BroadcastBackend): diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 44ec030..c58cb1d 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -1,23 +1,23 @@ import asyncio -import typing from contextlib import asynccontextmanager +from typing import Any, AsyncGenerator, AsyncIterator, Dict, Optional from urllib.parse import urlparse class Event: - def __init__(self, channel, message): + def __init__(self, channel: str, message: str) -> None: self.channel = channel self.message = message - def __eq__(self, other): + def __eq__(self, other: object) -> bool: return ( isinstance(other, Event) and self.channel == other.channel and self.message == other.message ) - def __repr__(self): - return f'Event(channel={self.channel!r}, message={self.message!r})' + def __repr__(self) -> str: + return f"Event(channel={self.channel!r}, message={self.message!r})" class Unsubscribed(Exception): @@ -26,29 +26,36 @@ class Unsubscribed(Exception): class Broadcast: def __init__(self, url: str): + from broadcaster._backends.base import BroadcastBackend + parsed_url = urlparse(url) - self._subscribers = {} - if parsed_url.scheme == 'redis': - from ._backends.redis import RedisBackend + self._backend: BroadcastBackend + self._subscribers: Dict[str, Any] = {} + if parsed_url.scheme == "redis": + from broadcaster._backends.redis import RedisBackend + self._backend = RedisBackend(url) - elif parsed_url.scheme in ('postgres', 'postgresql'): - from ._backends.postgres import PostgresBackend + elif parsed_url.scheme in ("postgres", "postgresql"): + from broadcaster._backends.postgres import PostgresBackend + self._backend = PostgresBackend(url) - if parsed_url.scheme == 'kafka': - from ._backends.kafka import KafkaBackend + if parsed_url.scheme == "kafka": + from broadcaster._backends.kafka import KafkaBackend + self._backend = KafkaBackend(url) - elif parsed_url.scheme == 'memory': - from ._backends.memory import MemoryBackend + elif parsed_url.scheme == "memory": + from broadcaster._backends.memory import MemoryBackend + self._backend = MemoryBackend(url) - async def __aenter__(self) -> 'Broadcast': + async def __aenter__(self) -> "Broadcast": await self.connect() return self - async def __aexit__(self, *args, **kwargs) -> None: + async def __aexit__(self, *args: Any, **kwargs: Any) -> None: await self.disconnect() async def connect(self) -> None: @@ -68,11 +75,11 @@ async def _listener(self) -> None: for queue in list(self._subscribers.get(event.channel, [])): await queue.put(event) - async def publish(self, channel: str, message: typing.Any) -> None: + async def publish(self, channel: str, message: Any) -> None: await self._backend.publish(channel, message) @asynccontextmanager - async def subscribe(self, channel: str) -> 'Subscriber': + async def subscribe(self, channel: str) -> AsyncIterator["Subscriber"]: queue: asyncio.Queue = asyncio.Queue() try: @@ -93,10 +100,10 @@ async def subscribe(self, channel: str) -> 'Subscriber': class Subscriber: - def __init__(self, queue): + def __init__(self, queue: asyncio.Queue) -> None: self._queue = queue - async def __aiter__(self): + async def __aiter__(self) -> Optional[AsyncGenerator]: try: while True: yield await self.get() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..811910c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,25 @@ +-e .[redis,postgres,kafka] + +# Documentation +mkdocs +mkautodoc +mkdocs-material + +# Packaging +twine +wheel + +# Tests & Linting +autoflake +black==20.8b1 +coverage==5.3 +flake8 +flake8-bugbear +flake8-pie==0.5.* +isort==5.* +mypy +pytest==5.* +pytest-asyncio +pytest-trio +trio +trio-typing diff --git a/scripts/build b/scripts/build new file mode 100755 index 0000000..7d327b5 --- /dev/null +++ b/scripts/build @@ -0,0 +1,13 @@ +#!/bin/sh -e + +if [ -d 'venv' ] ; then + PREFIX="venv/bin/" +else + PREFIX="" +fi + +set -x + +${PREFIX}python setup.py sdist bdist_wheel +${PREFIX}twine check dist/* +# ${PREFIX}mkdocs build diff --git a/scripts/check b/scripts/check new file mode 100755 index 0000000..77acf65 --- /dev/null +++ b/scripts/check @@ -0,0 +1,14 @@ +#!/bin/sh -e + +export PREFIX="" +if [ -d 'venv' ] ; then + export PREFIX="venv/bin/" +fi +export SOURCE_FILES="broadcaster tests" + +set -x + +${PREFIX}black --check --diff --target-version=py37 $SOURCE_FILES +${PREFIX}flake8 $SOURCE_FILES +${PREFIX}mypy $SOURCE_FILES +${PREFIX}isort --check --diff --project=httpx $SOURCE_FILES diff --git a/scripts/coverage b/scripts/coverage new file mode 100755 index 0000000..73a2198 --- /dev/null +++ b/scripts/coverage @@ -0,0 +1,11 @@ +#!/bin/sh -e + +export PREFIX="" +if [ -d 'venv' ] ; then + export PREFIX="venv/bin/" +fi +export SOURCE_FILES="broadcaster tests" + +set -x + +${PREFIX}coverage report --show-missing --skip-covered --fail-under=100 diff --git a/scripts/install b/scripts/install new file mode 100755 index 0000000..65885a7 --- /dev/null +++ b/scripts/install @@ -0,0 +1,19 @@ +#!/bin/sh -e + +# Use the Python executable provided from the `-p` option, or a default. +[ "$1" = "-p" ] && PYTHON=$2 || PYTHON="python3" + +REQUIREMENTS="requirements.txt" +VENV="venv" + +set -x + +if [ -z "$GITHUB_ACTIONS" ]; then + "$PYTHON" -m venv "$VENV" + PIP="$VENV/bin/pip" +else + PIP="pip" +fi + +"$PIP" install -r "$REQUIREMENTS" +"$PIP" install -e . diff --git a/scripts/lint b/scripts/lint new file mode 100755 index 0000000..81851c6 --- /dev/null +++ b/scripts/lint @@ -0,0 +1,13 @@ +#!/bin/sh -e + +export PREFIX="" +if [ -d 'venv' ] ; then + export PREFIX="venv/bin/" +fi +export SOURCE_FILES="httpx tests" + +set -x + +${PREFIX}autoflake --in-place --recursive $SOURCE_FILES +${PREFIX}isort --project=httpx $SOURCE_FILES +${PREFIX}black --target-version=py37 $SOURCE_FILES diff --git a/scripts/test b/scripts/test new file mode 100755 index 0000000..f7f32ad --- /dev/null +++ b/scripts/test @@ -0,0 +1,18 @@ +#!/bin/sh + +export PREFIX="" +if [ -d 'venv' ] ; then + export PREFIX="venv/bin/" +fi + +set -ex + +if [ -z $GITHUB_ACTIONS ]; then + scripts/check +fi + +${PREFIX}coverage run -m pytest + +if [ -z $GITHUB_ACTIONS ]; then + scripts/coverage +fi diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..c860d81 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,24 @@ +[flake8] +ignore = W503, E203, B305 +max-line-length = 120 + +[mypy] +disallow_untyped_defs = True +ignore_missing_imports = True + +[mypy-tests.*] +disallow_untyped_defs = False +check_untyped_defs = True + +[tool:isort] +profile = black +combine_as_imports = True + +[tool:pytest] +addopts = -rxXs +markers = + copied_from(source, changes=None): mark test as copied from somewhere else, along with a description of changes made to accodomate e.g. our test setup + +[coverage:run] +omit = venv/* +include = httpx/*, tests/* diff --git a/setup.py b/setup.py index 2cd87a3..668b715 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,8 @@ def get_packages(package): extras_require={ "redis": ["asyncio-redis"], "postgres": ["asyncpg"], - "kafka": ["aiokafka"] + "kafka": ["aiokafka"], + "test": ["pytest", "pytest-asyncio"] }, classifiers=[ "Development Status :: 3 - Alpha", diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 7e0c8c0..61e7295 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -1,43 +1,45 @@ -import asyncio import pytest + from broadcaster import Broadcast @pytest.mark.asyncio async def test_memory(): - async with Broadcast('memory://') as broadcast: - async with broadcast.subscribe('chatroom') as subscriber: - await broadcast.publish('chatroom', 'hello') + async with Broadcast("memory://") as broadcast: + async with broadcast.subscribe("chatroom") as subscriber: + await broadcast.publish("chatroom", "hello") event = await subscriber.get() - assert event.channel == 'chatroom' - assert event.message == 'hello' + assert event.channel == "chatroom" + assert event.message == "hello" @pytest.mark.asyncio async def test_redis(): - async with Broadcast('redis://localhost:6379') as broadcast: - async with broadcast.subscribe('chatroom') as subscriber: - await broadcast.publish('chatroom', 'hello') + async with Broadcast("redis://localhost:6379") as broadcast: + async with broadcast.subscribe("chatroom") as subscriber: + await broadcast.publish("chatroom", "hello") event = await subscriber.get() - assert event.channel == 'chatroom' - assert event.message == 'hello' + assert event.channel == "chatroom" + assert event.message == "hello" @pytest.mark.asyncio async def test_postgres(): - async with Broadcast('postgres://postgres:postgres@localhost:5432/broadcaster') as broadcast: - async with broadcast.subscribe('chatroom') as subscriber: - await broadcast.publish('chatroom', 'hello') + async with Broadcast( + "postgres://postgres:postgres@localhost:5432/broadcaster" + ) as broadcast: + async with broadcast.subscribe("chatroom") as subscriber: + await broadcast.publish("chatroom", "hello") event = await subscriber.get() - assert event.channel == 'chatroom' - assert event.message == 'hello' + assert event.channel == "chatroom" + assert event.message == "hello" @pytest.mark.asyncio async def test_kafka(): - async with Broadcast('kafka://localhost:9092') as broadcast: - async with broadcast.subscribe('chatroom') as subscriber: - await broadcast.publish('chatroom', 'hello') + async with Broadcast("kafka://localhost:9092") as broadcast: + async with broadcast.subscribe("chatroom") as subscriber: + await broadcast.publish("chatroom", "hello") event = await subscriber.get() - assert event.channel == 'chatroom' - assert event.message == 'hello' + assert event.channel == "chatroom" + assert event.message == "hello"