Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ The HTML template for the front end [is available here](https://github.com/encod
* `pip install broadcaster`
* `pip install broadcaster[redis]`
* `pip install broadcaster[postgres]`
* `pip install broadcaster[kafka]`

## Available backends

* `Broadcast('memory://')`
* `Broadcast("redis://localhost:6379")`
* `Broadcast("postgres://localhost:5432/hostedapi")`
* `Broadcast("kafka://localhost:9092")`

## Where next?

Expand Down
39 changes: 39 additions & 0 deletions broadcaster/_backends/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import typing
from urllib.parse import urlparse

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

from .._base import Event
from .base import BroadcastBackend


class KafkaBackend(BroadcastBackend):
def __init__(self, url: str):
self._servers = [urlparse(url).netloc]
self._consumer_channels: typing.Set = set()

async def connect(self) -> None:
loop = asyncio.get_event_loop()
self._producer = AIOKafkaProducer(loop=loop, bootstrap_servers=self._servers)
self._consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self._servers)
await self._producer.start()
await self._consumer.start()

async def disconnect(self) -> None:
await self._producer.stop()
await self._consumer.stop()

async def subscribe(self, channel: str) -> None:
self._consumer_channels.add(channel)
self._consumer.subscribe(topics=self._consumer_channels)

async def unsubscribe(self, channel: str) -> None:
await self._consumer.unsubscribe()

async def publish(self, channel: str, message: typing.Any) -> None:
await self._producer.send_and_wait(channel, message.encode("utf8"))

async def next_published(self) -> Event:
message = await self._consumer.getone()
return Event(channel=message.topic, message=message.value.decode("utf8"))
4 changes: 2 additions & 2 deletions broadcaster/_backends/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

class MemoryBackend(BroadcastBackend):
def __init__(self, url: str):
self._subscribed = set()
self._published = asyncio.Queue()
self._subscribed: typing.Set = set()
self._published: asyncio.Queue = asyncio.Queue()

async def connect(self) -> None:
pass
Expand Down
4 changes: 1 addition & 3 deletions broadcaster/_backends/postgres.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import asyncio
import asyncpg
import typing
from urllib.parse import ParseResult
from .base import BroadcastBackend
from .._base import Event


class PostgresBackend(BroadcastBackend):
def __init__(self, url: str):
self._url = url
self._listen_queue = asyncio.Queue()
self._listen_queue: asyncio.Queue = asyncio.Queue()

async def connect(self) -> None:
self._conn = await asyncpg.connect(self._url)
Expand Down
6 changes: 5 additions & 1 deletion broadcaster/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def __init__(self, url: str):
from ._backends.postgres import PostgresBackend
self._backend = PostgresBackend(url)

if parsed_url.scheme == 'kafka':
from ._backends.kafka import KafkaBackend
self._backend = KafkaBackend(url)

elif parsed_url.scheme == 'memory':
from ._backends.memory import MemoryBackend
self._backend = MemoryBackend(url)
Expand Down Expand Up @@ -69,7 +73,7 @@ async def publish(self, channel: str, message: typing.Any) -> None:

@asynccontextmanager
async def subscribe(self, channel: str) -> 'Subscriber':
queue = asyncio.Queue()
queue: asyncio.Queue = asyncio.Queue()

try:
if not self._subscribers.get(channel):
Expand Down
38 changes: 38 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: '3'
services:
zookeeper:
image: "confluentinc/cp-zookeeper"
hostname: zookeeper
ports:
- 32181:32181
environment:
- ZOOKEEPER_CLIENT_PORT=32181
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: confluentinc/cp-kafka
hostname: kafka
ports:
- 9092:9092
- 29092:29092
depends_on:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT_HOST://localhost:29092,PLAINTEXT://kafka:9092
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
redis:
image: "redis:alpine"
ports:
- 6379:6379
postgres:
image: "postgres:12"
environment:
- POSTGRES_DB=hostedapi
- POSTGRES_PASSWORD=postgres
- POSTGRES_HOST_AUTH_METHOD=trust
- POSTGRES_USER=postgres
ports:
- 5432:5432
24 changes: 24 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Setup

Install python dependencies in your virtualenv

```bash
pip install -r requirements.txt
```

Run example with memory as backend.

```bash
uvicorn example.app:app
```

You can also install broadcaster locally using `pip install -e .`.

In order to run the app with different backends, you have to set the env
`BROADCAST_URL` and start the docker services.

| Backend | Env | Service command |
| -------- | ---------------------------------------------------------- | ---------------------------- |
| kafka | `export BROADCAST_URL=kafka://localhost:9092` | `docker-compose up kafka` |
| redis | `export BROADCAST_URL=redis://localhost:6379` | `docker-compose up redis` |
| postgres | `export BROADCAST_URL=postgres://localhost:5432/hostedapi` | `docker-compose up postgres` |
13 changes: 9 additions & 4 deletions example.py → example/app.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from broadcaster import Broadcast
import os

import uvicorn
from starlette.applications import Starlette
from starlette.concurrency import run_until_first_complete
from starlette.routing import Route, WebSocketRoute
from starlette.templating import Jinja2Templates

from broadcaster import Broadcast

BROADCAST_URL = os.environ.get("BROADCAST_URL", "memory://")

broadcast = Broadcast("redis://localhost:6379")
templates = Jinja2Templates("templates")
broadcast = Broadcast(BROADCAST_URL)
templates = Jinja2Templates("example/templates")


async def homepage(request):
Expand Down Expand Up @@ -36,7 +41,7 @@ async def chatroom_ws_sender(websocket):

routes = [
Route("/", homepage),
WebSocketRoute("/", chatroom_ws, name='chatroom_ws'),
WebSocketRoute("/", chatroom_ws, name="chatroom_ws"),
]


Expand Down
4 changes: 4 additions & 0 deletions example/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
uvicorn
starlette
jinja2
broadcaster[redis,postgres,kafka]
File renamed without changes.
14 changes: 14 additions & 0 deletions scripts/start
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/sh -e
# Accepted values: postgres, kafka, redis
# If no variable provided all services will start
if [ -n "$1" ]; then
if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ]; then
echo "Not a valid value. Choose one or none:
kafka
redis
postgres ";
exit 1;
fi
fi

docker-compose up $1
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_packages(package):
extras_require={
"redis": ["asyncio-redis"],
"postgres": ["asyncpg"],
"kafka": ["aiokafka"]
},
classifiers=[
"Development Status :: 3 - Alpha",
Expand Down
Empty file added tests/__init__.py
Empty file.
12 changes: 11 additions & 1 deletion tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,17 @@ async def test_redis():

@pytest.mark.asyncio
async def test_postgres():
async with Broadcast('postgres://localhost:5432/hostedapi') as broadcast:
async with Broadcast('postgres://postgres:postgres@localhost:5432/hostedapi') as broadcast:
async with broadcast.subscribe('chatroom') as subscriber:
await broadcast.publish('chatroom', 'hello')
event = await subscriber.get()
assert event.channel == 'chatroom'
assert event.message == 'hello'


@pytest.mark.asyncio
async def test_kafka():
async with Broadcast('kafka://localhost:9092') as broadcast:
async with broadcast.subscribe('chatroom') as subscriber:
await broadcast.publish('chatroom', 'hello')
event = await subscriber.get()
Expand Down