Skip to content

ddc/ddcDatabases

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

69 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ddcDatabases

Donate License: MIT PyPi PyPI Downloads codecov CI/CD Pipeline Quality Gate Status
Build Status Code style: black Python

Support me on GitHub

A Python library for database connections and ORM queries with support for multiple database engines including SQLite, PostgreSQL, MySQL, MSSQL, Oracle, and MongoDB.

Table of Contents

Features

  • Multiple Database Support: SQLite, PostgreSQL, MySQL, MSSQL, Oracle, and MongoDB
  • Sync and Async Support: Both synchronous and asynchronous operations
  • Environment Configuration: Optional parameters with .env file fallback
  • SQLAlchemy Integration: Built on top of SQLAlchemy ORM
  • Connection Pooling: Configurable connection pooling for better performance

Default Session Settings

  • autoflush = False
  • expire_on_commit = False
  • echo = False

Note: All constructor parameters are optional and fall back to .env file variables.

Installation

Basic Installation (SQLite only)

pip install ddcDatabases

Note: The basic installation includes only SQlite. Database-specific drivers are optional extras that you can install as needed.

Database-Specific Installations

Install only the database drivers you need:

# All database drivers (recommended for development)
pip install ddcDatabases[all]

# SQL Server / MSSQL
pip install ddcDatabases[mssql]

# MySQL / MariaDB
pip install ddcDatabases[mysql]

# PostgreSQL
pip install ddcDatabases[pgsql]

# Oracle Database
pip install ddcDatabases[oracle]

# MongoDB
pip install ddcDatabases[mongodb]

# Multiple databases (example)
pip install ddcDatabases[mysql,pgsql,mongodb]

Available Database Extras:

  • all - All database drivers
  • mssql - Microsoft SQL Server (pyodbc, aioodbc)
  • mysql - MySQL/MariaDB (pymysql, aiomysql)
  • pgsql - PostgreSQL (psycopg2-binary, asyncpg)
  • oracle - Oracle Database (cx-oracle)
  • mongodb - MongoDB (pymongo)

Platform Notes:

  • SQLite support is included by default (no extra installation required)
  • PostgreSQL extras may have compilation requirements on some systems
  • All extras support both synchronous and asynchronous operations where applicable

Database Classes

SQLite

Example:

import sqlalchemy as sa
from ddcDatabases import DBUtils, Sqlite
from your_models import Model  # Your SQLAlchemy model

with Sqlite(filepath="data.db") as session:
    db_utils = DBUtils(session)
    stmt = sa.select(Model).where(Model.id == 1)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)

MSSQL (SQL Server)

Synchronous Example:

import sqlalchemy as sa
from ddcDatabases import DBUtils, MSSQL
from your_models import Model

kwargs = {
    "host": "127.0.0.1",
    "port": 1433,
    "user": "sa",
    "password": "password",
    "database": "master",
    "db_schema": "dbo",
    "echo": True,
    "autoflush": True,
    "expire_on_commit": True,
    "autocommit": True,
    "connection_timeout": 30,
    "pool_recycle": 3600,
    "pool_size": 25,
    "max_overflow": 50,
}

with MSSQL(**kwargs) as session:
    stmt = sa.select(Model).where(Model.id == 1)
    db_utils = DBUtils(session)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)

Asynchronous Example:

import asyncio
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, MSSQL
from your_models import Model

async def main():
    async with MSSQL(**kwargs) as session:
        stmt = sa.select(Model).where(Model.id == 1)
        db_utils = DBUtilsAsync(session)
        results = await db_utils.fetchall(stmt)
        for row in results:
            print(row)
asyncio.run(main())

PostgreSQL

Synchronous Example:

import sqlalchemy as sa
from ddcDatabases import DBUtils, PostgreSQL
from your_models import Model

kwargs = {
    "host": "127.0.0.1",
    "port": 5432,
    "user": "postgres",
    "password": "postgres",
    "database": "postgres",
    "echo": True,
    "autoflush": False,
    "expire_on_commit": False,
    "autocommit": True,
    "connection_timeout": 30,
    "pool_recycle": 3600,
    "pool_size": 25,
    "max_overflow": 50,
}

with PostgreSQL(**kwargs) as session:
    stmt = sa.select(Model).where(Model.id == 1)
    db_utils = DBUtils(session)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)

Asynchronous Example:

import asyncio
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, PostgreSQL
from your_models import Model

async def main():
    async with PostgreSQL(**kwargs) as session:
        stmt = sa.select(Model).where(Model.id == 1)
        db_utils = DBUtilsAsync(session)
        results = await db_utils.fetchall(stmt)
        for row in results:
            print(row)
asyncio.run(main())

MySQL

Synchronous Example:

import sqlalchemy as sa
from ddcDatabases import DBUtils, MySQL

kwargs = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "password": "root",
    "database": "dev",
    "echo": True,
    "autoflush": False,
    "expire_on_commit": False,
    "autocommit": True,
    "connection_timeout": 30,
    "pool_recycle": 3600,
    "pool_size": 25,
    "max_overflow": 50,
}

with MySQL(**kwargs) as session:
    stmt = sa.text("SELECT * FROM users WHERE id = 1")
    db_utils = DBUtils(session)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)

Asynchronous Example:

import asyncio
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, MySQL
async def main() -> None:
    async with MySQL(**kwargs) as session:
        stmt = sa.text("SELECT * FROM users")
        db_utils = DBUtilsAsync(session)
        results = await db_utils.fetchall(stmt)
        for row in results:
            print(row)
asyncio.run(main())

Oracle

Example with explicit credentials:

import sqlalchemy as sa
from ddcDatabases import DBUtils, Oracle

kwargs = {
    "host": "127.0.0.1",
    "port": 1521,
    "user": "system",
    "password": "oracle",
    "servicename": "xe",
    "echo": False,
    "autoflush": False,
    "expire_on_commit": False,
    "autocommit": True,
    "connection_timeout": 30,
    "pool_recycle": 3600,
    "pool_size": 25,
    "max_overflow": 50,
}

with Oracle(**kwargs) as session:
    stmt = sa.text("SELECT * FROM dual")
    db_utils = DBUtils(session)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)

MongoDB

Example with explicit credentials:

from ddcDatabases import MongoDB
from bson.objectid import ObjectId

kwargs = {
    "host": "127.0.0.1",
    "port": 27017,
    "user": "admin",
    "password": "admin",
    "database": "admin",
    "collection": "test_collection",
    "sort_column": "_id",
    "sort_order": "asc", # asc or desc
}

query = {"_id": ObjectId("689c9f71dd642a68cfc60477")}
with MongoDB(**kwargs, query=query) as cursor:
    for each in cursor:
        print(each)

Database Engines

Access the underlying SQLAlchemy engine for advanced operations:

Synchronous Engine:

from ddcDatabases import PostgreSQL

with PostgreSQL() as session:
    engine = session.bind
    # Use engine for advanced operations

Asynchronous Engine:

from ddcDatabases import PostgreSQL

async def main():
    async with PostgreSQL() as session:
        engine = session.bind
        # Use engine for advanced operations

Database Utilities

The DBUtils and DBUtilsAsync classes provide convenient methods for common database operations:

Available Methods

from ddcDatabases import DBUtils, DBUtilsAsync

# Synchronous utilities
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)           # Returns list of RowMapping objects
value = db_utils.fetchvalue(stmt)           # Returns single value as string
db_utils.insert(stmt)                       # Insert into model table
db_utils.deleteall(model)                   # Delete all records from model
db_utils.insertbulk(model, data_list)      # Bulk insert from list of dictionaries
db_utils.execute(stmt)                      # Execute any SQLAlchemy statement

# Asynchronous utilities (similar interface with await)
db_utils_async = DBUtilsAsync(session)
results = await db_utils_async.fetchall(stmt)

Logging

import logging
logging.getLogger('ddcDatabases').setLevel(logging.INFO)
logging.getLogger('ddcDatabases').addHandler(logging.StreamHandler())

Development

Building from Source

poetry build -f wheel

Running Tests

poetry update --with test
poe tests

License

Released under the MIT License

Support

If you find this project helpful, consider supporting development: