Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
name: Test Suite

on:
push:
branches: ["master"]
pull_request:
branches: ["master"]

jobs:
tests:
name: "Python ${{ matrix.python-version }}"
runs-on: "ubuntu-latest"

strategy:
matrix:
python-version: ["3.7", "3.8", "3.9"]

services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 32181:32181
env:
ZOOKEEPER_CLIENT_PORT: 32181
ALLOW_ANONYMOUS_LOGIN: yes
options: --hostname zookeeper
kafka:
image: confluentinc/cp-kafka
ports:
- 9092:9092
- 29092:29092
env:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:29092,PLAINTEXT://localhost:9092"
KAFKA_BROKER_ID: 1
ALLOW_PLAINTEXT_LISTENER: yes
options: --hostname kafka
redis:
image: redis:alpine
ports:
- 6379:6379
postgres:
image: postgres:12
env:
POSTGRES_DB: broadcaster
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: trust
POSTGRES_USER: postgres
ports:
- 5432:5432

steps:
- uses: "actions/checkout@v2"
- uses: "actions/setup-python@v2"
with:
python-version: "${{ matrix.python-version }}"
- name: "Install dependencies"
run: "scripts/install"
- name: "Run linting checks"
run: "scripts/check"
- name: "Build package & docs"
run: "scripts/build"
- name: "Run tests"
run: "scripts/test"
- name: "Enforce coverage"
run: "scripts/coverage"
1 change: 1 addition & 0 deletions broadcaster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ._base import Broadcast, Event

__version__ = "0.2.0"
__all__ = ["Broadcast", "Event"]
10 changes: 6 additions & 4 deletions broadcaster/_backends/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import typing
from typing import Any

from .._base import Event


class BroadcastBackend:
def __init__(self, url):
def __init__(self, url: str) -> None:
raise NotImplementedError()

async def connect(self) -> None:
Expand All @@ -17,8 +19,8 @@ async def subscribe(self, group: str) -> None:
async def unsubscribe(self, group: str) -> None:
raise NotImplementedError()

async def publish(self, channel: str, message: typing.Any) -> None:
async def publish(self, channel: str, message: Any) -> None:
raise NotImplementedError()

async def next_published(self) -> typing.Tuple[str, typing.Any]:
async def next_published(self) -> Event:
raise NotImplementedError()
3 changes: 2 additions & 1 deletion broadcaster/_backends/memory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
import typing
from .base import BroadcastBackend

from .._base import Event
from .base import BroadcastBackend


class MemoryBackend(BroadcastBackend):
Expand Down
7 changes: 5 additions & 2 deletions broadcaster/_backends/postgres.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
from typing import Any

import asyncpg
from .base import BroadcastBackend

from .._base import Event
from .base import BroadcastBackend


class PostgresBackend(BroadcastBackend):
Expand All @@ -24,7 +27,7 @@ async def unsubscribe(self, channel: str) -> None:
async def publish(self, channel: str, message: str) -> None:
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)

def _listener(self, *args) -> None:
def _listener(self, *args: Any) -> None:
connection, pid, channel, payload = args
event = Event(channel=channel, message=payload)
self._listen_queue.put_nowait(event)
Expand Down
6 changes: 4 additions & 2 deletions broadcaster/_backends/redis.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio_redis
import typing
from urllib.parse import urlparse
from .base import BroadcastBackend

import asyncio_redis

from .._base import Event
from .base import BroadcastBackend


class RedisBackend(BroadcastBackend):
Expand Down
47 changes: 27 additions & 20 deletions broadcaster/_base.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import asyncio
import typing
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, AsyncIterator, Dict, Optional
from urllib.parse import urlparse


class Event:
def __init__(self, channel, message):
def __init__(self, channel: str, message: str) -> None:
self.channel = channel
self.message = message

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
return (
isinstance(other, Event)
and self.channel == other.channel
and self.message == other.message
)

def __repr__(self):
return f'Event(channel={self.channel!r}, message={self.message!r})'
def __repr__(self) -> str:
return f"Event(channel={self.channel!r}, message={self.message!r})"


class Unsubscribed(Exception):
Expand All @@ -26,29 +26,36 @@ class Unsubscribed(Exception):

class Broadcast:
def __init__(self, url: str):
from broadcaster._backends.base import BroadcastBackend

parsed_url = urlparse(url)
self._subscribers = {}
if parsed_url.scheme == 'redis':
from ._backends.redis import RedisBackend
self._backend: BroadcastBackend
self._subscribers: Dict[str, Any] = {}
if parsed_url.scheme == "redis":
from broadcaster._backends.redis import RedisBackend

self._backend = RedisBackend(url)

elif parsed_url.scheme in ('postgres', 'postgresql'):
from ._backends.postgres import PostgresBackend
elif parsed_url.scheme in ("postgres", "postgresql"):
from broadcaster._backends.postgres import PostgresBackend

self._backend = PostgresBackend(url)

if parsed_url.scheme == 'kafka':
from ._backends.kafka import KafkaBackend
if parsed_url.scheme == "kafka":
from broadcaster._backends.kafka import KafkaBackend

self._backend = KafkaBackend(url)

elif parsed_url.scheme == 'memory':
from ._backends.memory import MemoryBackend
elif parsed_url.scheme == "memory":
from broadcaster._backends.memory import MemoryBackend

self._backend = MemoryBackend(url)

async def __aenter__(self) -> 'Broadcast':
async def __aenter__(self) -> "Broadcast":
await self.connect()
return self

async def __aexit__(self, *args, **kwargs) -> None:
async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
await self.disconnect()

async def connect(self) -> None:
Expand All @@ -68,11 +75,11 @@ async def _listener(self) -> None:
for queue in list(self._subscribers.get(event.channel, [])):
await queue.put(event)

async def publish(self, channel: str, message: typing.Any) -> None:
async def publish(self, channel: str, message: Any) -> None:
await self._backend.publish(channel, message)

@asynccontextmanager
async def subscribe(self, channel: str) -> 'Subscriber':
async def subscribe(self, channel: str) -> AsyncIterator["Subscriber"]:
queue: asyncio.Queue = asyncio.Queue()

try:
Expand All @@ -93,10 +100,10 @@ async def subscribe(self, channel: str) -> 'Subscriber':


class Subscriber:
def __init__(self, queue):
def __init__(self, queue: asyncio.Queue) -> None:
self._queue = queue

async def __aiter__(self):
async def __aiter__(self) -> Optional[AsyncGenerator]:
try:
while True:
yield await self.get()
Expand Down
25 changes: 25 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-e .[redis,postgres,kafka]

# Documentation
mkdocs
mkautodoc
mkdocs-material

# Packaging
twine
wheel

# Tests & Linting
autoflake
black==20.8b1
coverage==5.3
flake8
flake8-bugbear
flake8-pie==0.5.*
isort==5.*
mypy
pytest==5.*
pytest-asyncio
pytest-trio
trio
trio-typing
13 changes: 13 additions & 0 deletions scripts/build
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh -e

if [ -d 'venv' ] ; then
PREFIX="venv/bin/"
else
PREFIX=""
fi

set -x

${PREFIX}python setup.py sdist bdist_wheel
${PREFIX}twine check dist/*
# ${PREFIX}mkdocs build
14 changes: 14 additions & 0 deletions scripts/check
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/sh -e

export PREFIX=""
if [ -d 'venv' ] ; then
export PREFIX="venv/bin/"
fi
export SOURCE_FILES="broadcaster tests"

set -x

${PREFIX}black --check --diff --target-version=py37 $SOURCE_FILES
${PREFIX}flake8 $SOURCE_FILES
${PREFIX}mypy $SOURCE_FILES
${PREFIX}isort --check --diff --project=httpx $SOURCE_FILES
11 changes: 11 additions & 0 deletions scripts/coverage
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/sh -e

export PREFIX=""
if [ -d 'venv' ] ; then
export PREFIX="venv/bin/"
fi
export SOURCE_FILES="broadcaster tests"

set -x

${PREFIX}coverage report --show-missing --skip-covered --fail-under=100
19 changes: 19 additions & 0 deletions scripts/install
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/sh -e

# Use the Python executable provided from the `-p` option, or a default.
[ "$1" = "-p" ] && PYTHON=$2 || PYTHON="python3"

REQUIREMENTS="requirements.txt"
VENV="venv"

set -x

if [ -z "$GITHUB_ACTIONS" ]; then
"$PYTHON" -m venv "$VENV"
PIP="$VENV/bin/pip"
else
PIP="pip"
fi

"$PIP" install -r "$REQUIREMENTS"
"$PIP" install -e .
13 changes: 13 additions & 0 deletions scripts/lint
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh -e

export PREFIX=""
if [ -d 'venv' ] ; then
export PREFIX="venv/bin/"
fi
export SOURCE_FILES="httpx tests"

set -x

${PREFIX}autoflake --in-place --recursive $SOURCE_FILES
${PREFIX}isort --project=httpx $SOURCE_FILES
${PREFIX}black --target-version=py37 $SOURCE_FILES
18 changes: 18 additions & 0 deletions scripts/test
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh

export PREFIX=""
if [ -d 'venv' ] ; then
export PREFIX="venv/bin/"
fi

set -ex

if [ -z $GITHUB_ACTIONS ]; then
scripts/check
fi

${PREFIX}coverage run -m pytest

if [ -z $GITHUB_ACTIONS ]; then
scripts/coverage
fi
24 changes: 24 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[flake8]
ignore = W503, E203, B305
max-line-length = 120

[mypy]
disallow_untyped_defs = True
ignore_missing_imports = True

[mypy-tests.*]
disallow_untyped_defs = False
check_untyped_defs = True

[tool:isort]
profile = black
combine_as_imports = True

[tool:pytest]
addopts = -rxXs
markers =
copied_from(source, changes=None): mark test as copied from somewhere else, along with a description of changes made to accodomate e.g. our test setup

[coverage:run]
omit = venv/*
include = httpx/*, tests/*
Loading