More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.
- Supports async and sync
enginesandsessionmakers, or built fromdsn. - Supports all postgres DBAPIs supported by sqlalchemy.
e.g.
psycopg,psycopg2,asyncpg..
See SQLAlchemy Postgresql Dialects - Transaction-friendly operations via the
opmodule for combining PGMQ with your business logic in the same transaction. - Fully tested across all supported DBAPIs in both async and sync modes.
- Battle-tested with real-world FastAPI Pub/Sub examples and corresponding tests.
Install with pip:
pip install pgmq-sqlalchemyInstall with additional DBAPIs packages:
pip install "pgmq-sqlalchemy[asyncpg]"
pip install "pgmq-sqlalchemy[psycopg2-binary]"
# pip install "pgmq-sqlalchemy[postgres-python-driver]"Prerequisites: Postgres with PGMQ extension installed.
For quick setup:
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latestFor more information, see PGMQ
Note
Check pgmq-sqlalchemy Document for more examples and detailed usage.
For dispatcher.py:
from typing import List
from pgmq_sqlalchemy import PGMQueue
postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'
pgmq = PGMQueue(dsn=postgres_dsn)
pgmq.create_queue('my_queue')
msg = {'key': 'value', 'key2': 'value2'}
msg_id:int = pgmq.send('my_queue', msg)
# could also send a list of messages
msg_ids:List[int] = pgmq.send_batch('my_queue', [msg, msg])For consumer.py:
from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message
postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'
pgmq = PGMQueue(dsn=postgres_dsn)
# read a single message
msg:Message = pgmq.read('my_queue')
# read a batch of messages
msgs:List[Message] = pgmq.read_batch('my_queue', 10)For monitor.py:
from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import QueueMetrics
postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'
pgmq = PGMQueue(dsn=postgres_dsn)
# get queue metrics
metrics:QueueMetrics = pgmq.metrics('my_queue')
print(metrics.queue_length)
print(metrics.total_messages)Use the op module to combine PGMQ operations with your business logic in a single transaction:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from pgmq_sqlalchemy import op
engine = create_engine('postgresql://postgres:postgres@localhost:5432/postgres')
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
try:
# Create queue
op.create_queue('orders_queue', session=session, commit=False)
# Insert order into your database
session.execute(
text("INSERT INTO orders (user_id, total) VALUES (:user_id, :total)"),
{"user_id": 123, "total": 99.99}
)
# Send message to queue
op.send(
'orders_queue',
{'user_id': 123, 'action': 'process_order'},
session=session,
commit=False
)
# Commit everything together
session.commit()
except Exception as e:
session.rollback()
print(f"Transaction failed: {e}")See Transaction Usage Documentation for more examples.
See the FastAPI Pub/Sub Example for a complete example of using pgmq-sqlalchemy in a FastAPI application with asynchronous message consumption and tests.
Welcome to open an issue or pull request !
See Development on Online Document or CONTRIBUTING.md for more information.
- Alembic compatible migration scripts for PGMQ extension and schema setup, upgrade, downgrade.
- Compatibility tests with PGMQ across different PGMQ versions.
- More examples
- Smoothen contributing process with custom script for one step setup
- Mypy strict type checking
- Enable more ruff rules
- Drop Python 3.9 support in next minor release