-
Notifications
You must be signed in to change notification settings - Fork 0
PostgreSQL connection caching and reuse #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
452008d
731d259
f9e9f29
9d4cba7
3cdabd0
07712ff
73d8b4d
2955caf
6c3ab3d
e579b94
714e0fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,69 +17,57 @@ | |
| """Postgres reader for run/job statistics.""" | ||
|
|
||
| import logging | ||
| import os | ||
| import time | ||
| from dataclasses import dataclass | ||
| from datetime import datetime, timezone | ||
| from functools import cached_property | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| import aiosql | ||
| from botocore.exceptions import BotoCoreError, ClientError | ||
|
|
||
| from src.utils.constants import ( | ||
| POSTGRES_DEFAULT_LIMIT, | ||
| POSTGRES_DEFAULT_WINDOW_MS, | ||
| POSTGRES_MAX_LIMIT, | ||
| POSTGRES_STATEMENT_TIMEOUT_MS, | ||
| REQUIRED_CONNECTION_FIELDS, | ||
| ) | ||
| from src.utils.utils import load_postgres_config | ||
|
|
||
| try: | ||
| import psycopg2 | ||
| from psycopg2 import Error as PsycopgError | ||
| from psycopg2 import sql as psycopg2_sql | ||
| except ImportError: | ||
| psycopg2 = None # type: ignore | ||
| psycopg2_sql = None # type: ignore | ||
|
|
||
| class PsycopgError(Exception): # type: ignore | ||
| """Shim psycopg2 error base when psycopg2 is not installed.""" | ||
|
|
||
| from src.utils.postgres_base import PsycopgError, PostgresBase | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| _RUNS_SQL_BASE = ( | ||
| "SELECT r.event_id, r.job_ref, r.tenant_id, r.source_app," | ||
| " r.source_app_version, r.environment," | ||
| " r.timestamp_start AS run_timestamp_start," | ||
| " r.timestamp_end AS run_timestamp_end," | ||
| " j.internal_id, j.country, j.catalog_id, j.status," | ||
| " j.timestamp_start, j.timestamp_end, j.message, j.additional_info" | ||
| " FROM public_cps_za_runs_jobs j" | ||
| " INNER JOIN public_cps_za_runs r ON j.event_id = r.event_id" | ||
| " WHERE r.timestamp_start >= %s AND r.timestamp_start <= %s" | ||
| ) | ||
| _SQL_DIR = Path(__file__).parent / "sql" | ||
|
|
||
|
|
||
| _RUNS_SQL_TAIL = " ORDER BY j.internal_id DESC LIMIT %s" | ||
| @dataclass(frozen=True) | ||
| class ReaderQueries: | ||
| """Typed holder for reader SQL query strings loaded via aiosql.""" | ||
|
|
||
| _RUNS_SQL = _RUNS_SQL_BASE + _RUNS_SQL_TAIL | ||
| _RUNS_SQL_WITH_CURSOR = _RUNS_SQL_BASE + " AND j.internal_id < %s" + _RUNS_SQL_TAIL | ||
| get_stats: str | ||
| get_stats_with_cursor: str | ||
|
|
||
|
|
||
| class ReaderPostgres: | ||
| class ReaderPostgres(PostgresBase): | ||
| """Read-only Postgres accessor for run/job statistics.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| self._secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") | ||
| self._secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") | ||
| self._db_config: dict[str, Any] | None = None | ||
| super().__init__() | ||
| logger.debug("Initialized PostgreSQL reader.") | ||
|
|
||
| def _load_db_config(self) -> dict[str, Any]: | ||
| """Load database config from AWS Secrets Manager if not already loaded.""" | ||
| if self._db_config is None: | ||
| self._db_config = load_postgres_config(self._secret_name, self._secret_region) | ||
| config = self._db_config | ||
| if config is None: | ||
| raise RuntimeError("Failed to load database configuration.") | ||
| return config | ||
| def _connect_options(self) -> str | None: | ||
| """Set statement timeout and read-only mode for reader connections.""" | ||
| return f"-c statement_timeout={POSTGRES_STATEMENT_TIMEOUT_MS}" " -c default_transaction_read_only=on" | ||
|
|
||
| @cached_property | ||
| def _queries(self) -> ReaderQueries: | ||
| """Load SQL queries from the `sql/` directory via aiosql.""" | ||
| queries = aiosql.from_path(_SQL_DIR, "psycopg2") | ||
| return ReaderQueries( | ||
| get_stats=queries.get_stats.sql, | ||
| get_stats_with_cursor=queries.get_stats_with_cursor.sql, | ||
| ) | ||
|
|
||
| def read_stats( | ||
| self, | ||
|
|
@@ -102,44 +90,25 @@ def read_stats( | |
| Raises: | ||
| RuntimeError: On database connectivity or query errors. | ||
| """ | ||
| db_config = self._load_db_config() | ||
| required_keys = ("database", "host", "user", "password", "port") | ||
| missing_keys = [key for key in required_keys if not db_config.get(key)] | ||
| if missing_keys: | ||
| raise RuntimeError(f"PostgreSQL config missing: {', '.join(missing_keys)}.") | ||
| if psycopg2 is None: | ||
| raise RuntimeError("psycopg2 is not available.") | ||
| config = self._pg_config | ||
| if not config.get("database"): | ||
| raise RuntimeError("PostgreSQL config missing: database.") | ||
| if not all(config.get(field) for field in REQUIRED_CONNECTION_FIELDS): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not something like this? missing = [field for field in REQUIRED_CONNECTION_FIELDS if not config.get(field)]
if missing:
raise RuntimeError(f"PostgreSQL config missing: {', '.join(missing)}.")or, if you want not just fields being present but also being non-empty, then maybe something like: missing = [
field for field in REQUIRED_CONNECTION_FIELDS
if field not in config or config[field] in (None, "")
]
if missing:
raise RuntimeError(f"PostgreSQL config missing: {', '.join(missing)}.")There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, you don't need to iterate over it twice and retrieve items from the config twice |
||
| missing = [field for field in REQUIRED_CONNECTION_FIELDS if not config.get(field)] | ||
| raise RuntimeError(f"PostgreSQL config missing: {', '.join(missing)}.") | ||
|
|
||
| limit = max(1, min(limit, POSTGRES_MAX_LIMIT)) | ||
| now_ms = int(time.time() * 1000) | ||
| ts_start = timestamp_start if timestamp_start is not None else (now_ms - POSTGRES_DEFAULT_WINDOW_MS) | ||
| ts_end = timestamp_end if timestamp_end is not None else now_ms | ||
|
|
||
| params: list[Any] = [ts_start, ts_end] | ||
| if cursor is not None: | ||
| params.append(cursor) | ||
| query = psycopg2_sql.SQL(_RUNS_SQL_WITH_CURSOR) | ||
| else: | ||
| query = psycopg2_sql.SQL(_RUNS_SQL) | ||
| params.append(limit + 1) | ||
|
|
||
| try: | ||
| with psycopg2.connect( # type: ignore[attr-defined] | ||
| database=db_config["database"], | ||
| host=db_config["host"], | ||
| user=db_config["user"], | ||
| password=db_config["password"], | ||
| port=db_config["port"], | ||
| connect_timeout=10, | ||
| gssencmode="disable", | ||
| options="-c statement_timeout=30000 -c default_transaction_read_only=on", | ||
| ) as connection: | ||
| with connection.cursor() as db_cursor: | ||
| db_cursor.execute(query, params) | ||
| col_names = [desc[0] for desc in db_cursor.description] # type: ignore[union-attr] | ||
| raw_rows = db_cursor.fetchall() | ||
| col_names, raw_rows = self._execute_with_retry( | ||
| lambda conn: self._run_stats_query(conn, ts_start, ts_end, cursor, limit) | ||
| ) | ||
| except PsycopgError as exc: | ||
| raise RuntimeError(f"Database query failed: {exc}") from exc | ||
| self._close_connection() | ||
| raise RuntimeError(f"Database query error: {exc}") from exc | ||
|
|
||
| rows = [dict(zip(col_names, row, strict=True)) for row in raw_rows] | ||
|
|
||
|
|
@@ -162,6 +131,41 @@ def read_stats( | |
| logger.debug("Stats query returned %d rows.", len(rows)) | ||
| return rows, pagination | ||
|
|
||
| def _run_stats_query( | ||
| self, | ||
| connection: Any, | ||
| ts_start: int, | ||
| ts_end: int, | ||
| cursor: int | None, | ||
| limit: int, | ||
| ) -> tuple[list[str], list[tuple[Any, ...]]]: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. such complicated type could maybe deserve its own type alias, like here: https://docs.python.org/3/library/typing.html#type-aliases (the whole or some part of it at least) |
||
| """Execute the stats SQL query and return column names and raw rows.""" | ||
| try: | ||
| with connection.cursor() as db_cursor: | ||
| if cursor is not None: | ||
| db_cursor.execute( | ||
| self._queries.get_stats_with_cursor, | ||
| {"ts_start": ts_start, "ts_end": ts_end, "cursor_id": cursor, "lim": limit + 1}, | ||
| ) | ||
| else: | ||
| db_cursor.execute( | ||
| self._queries.get_stats, | ||
| {"ts_start": ts_start, "ts_end": ts_end, "lim": limit + 1}, | ||
| ) | ||
| if db_cursor.description is None: | ||
| raise RuntimeError("Stats query returned no result description.") | ||
| col_names = [desc[0] for desc in db_cursor.description] | ||
| raw_rows = db_cursor.fetchall() | ||
| finally: | ||
| # Rollback closes the implicit transaction opened by the SELECT, | ||
| # leaving the cached connection in a clean idle state for reuse. | ||
| try: | ||
| connection.rollback() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I understand. It's not because of Select query itself, it's probably that in psycopg2, every query runs inside a transaction unless autocommit is enabled. And it's not (I found only 1 occurrence, in integration tests). So it's more like this: End the implicit transaction started by the query. Even SELECTs open a transaction in PostgreSQL; if left open, the connection remains "idle in transaction", which can cause MVCC bloat and issues when reusing the connection. Maybe improve the comment but I think that the overall approach is okay like this. It's more defensive strategy |
||
| except PsycopgError: | ||
| logger.debug("Failed to close the implicit transaction. Closing cached connection.", exc_info=True) | ||
| self._close_connection() | ||
| return col_names, raw_rows | ||
|
|
||
| @staticmethod | ||
| def _format_row(row: dict[str, Any]) -> dict[str, Any]: | ||
| """Add computed columns to a result row. | ||
|
|
@@ -227,14 +231,14 @@ def check_health(self) -> tuple[bool, str]: | |
| return False, "postgres secret not configured" | ||
|
|
||
| try: | ||
| db_config = self._load_db_config() | ||
| pg_config = self._pg_config | ||
| except (BotoCoreError, ClientError, RuntimeError, ValueError, KeyError) as err: | ||
| return False, str(err) | ||
|
|
||
| if not db_config.get("database"): | ||
| if not pg_config.get("database"): | ||
| return False, "database not configured" | ||
|
|
||
| missing = [f for f in ("host", "user", "password", "port") if not db_config.get(f)] | ||
| missing = [field for field in REQUIRED_CONNECTION_FIELDS if not pg_config.get(field)] | ||
| if missing: | ||
| return False, f"{missing[0]} not configured" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| -- name: get_stats(ts_start, ts_end, lim) | ||
| -- Get run/job statistics with keyset pagination. | ||
| SELECT r.event_id, r.job_ref, r.tenant_id, r.source_app, | ||
| r.source_app_version, r.environment, | ||
| r.timestamp_start AS run_timestamp_start, | ||
| r.timestamp_end AS run_timestamp_end, | ||
| j.internal_id, j.country, j.catalog_id, j.status, | ||
| j.timestamp_start, j.timestamp_end, j.message, j.additional_info | ||
| FROM public_cps_za_runs_jobs j | ||
| INNER JOIN public_cps_za_runs r ON j.event_id = r.event_id | ||
| WHERE r.timestamp_start >= :ts_start AND r.timestamp_start <= :ts_end | ||
| ORDER BY j.internal_id DESC | ||
| LIMIT :lim; | ||
|
|
||
| -- name: get_stats_with_cursor(ts_start, ts_end, cursor_id, lim) | ||
| -- Get run/job statistics with cursor-based keyset pagination. | ||
| SELECT r.event_id, r.job_ref, r.tenant_id, r.source_app, | ||
| r.source_app_version, r.environment, | ||
| r.timestamp_start AS run_timestamp_start, | ||
| r.timestamp_end AS run_timestamp_end, | ||
| j.internal_id, j.country, j.catalog_id, j.status, | ||
| j.timestamp_start, j.timestamp_end, j.message, j.additional_info | ||
| FROM public_cps_za_runs_jobs j | ||
| INNER JOIN public_cps_za_runs r ON j.event_id = r.event_id | ||
| WHERE r.timestamp_start >= :ts_start AND r.timestamp_start <= :ts_end | ||
| AND j.internal_id < :cursor_id | ||
| ORDER BY j.internal_id DESC | ||
| LIMIT :lim; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,24 +16,43 @@ | |
|
|
||
| """Constants and enums used across the project.""" | ||
|
|
||
| from typing import Any | ||
| from typing import TypedDict | ||
|
|
||
| # Configuration keys | ||
| TOKEN_PROVIDER_URL_KEY = "token_provider_url" | ||
| TOKEN_PUBLIC_KEY_URL_KEY = "token_public_key_url" | ||
| TOKEN_PUBLIC_KEYS_URL_KEY = "token_public_keys_url" | ||
| SSL_CA_BUNDLE_KEY = "ssl_ca_bundle" | ||
|
|
||
| # Postgres connection | ||
| POSTGRES_CONNECT_TIMEOUT_SECONDS = 5 | ||
| POSTGRES_STATEMENT_TIMEOUT_MS = 30000 | ||
| POSTGRES_MAX_RETRIES = 2 | ||
| REQUIRED_CONNECTION_FIELDS = ("host", "user", "password", "port") | ||
|
|
||
| # Postgres stats defaults | ||
| POSTGRES_DEFAULT_LIMIT = 50 | ||
| POSTGRES_MAX_LIMIT = 1000 | ||
| POSTGRES_DEFAULT_WINDOW_MS = 7 * 24 * 60 * 60 * 1000 # 7 days in milliseconds | ||
|
|
||
| SUPPORTED_TOPICS: list[str] = ["public.cps.za.runs"] | ||
| # Topic name constants | ||
| TOPIC_RUNS = "public.cps.za.runs" | ||
| TOPIC_DLCHANGE = "public.cps.za.dlchange" | ||
| TOPIC_TEST = "public.cps.za.test" | ||
|
|
||
| SUPPORTED_TOPICS: list[str] = [TOPIC_RUNS] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels a bit misleading. Supported topics sounds like topics broadly supported for the project. But I checked the usages of it and it's used only for the |
||
|
|
||
|
|
||
| class TopicTableConfig(TypedDict, total=False): | ||
| """Structure describing a topic's PostgreSQL table mapping.""" | ||
|
|
||
| main: str | ||
| jobs: str | ||
| columns: dict[str, list[str]] | ||
|
|
||
|
|
||
| # Maps topic names to their PostgreSQL table(s) | ||
| TOPIC_TABLE_MAP: dict[str, dict[str, Any]] = { | ||
| "public.cps.za.runs": { | ||
| TOPIC_TABLE_MAP: dict[str, TopicTableConfig] = { | ||
| TOPIC_RUNS: { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a bit confused about this - what's the usage? I see only 1 IF condition on Also, consider to rename it to |
||
| "main": "public_cps_za_runs", | ||
| "jobs": "public_cps_za_runs_jobs", | ||
| "columns": { | ||
|
|
@@ -60,7 +79,7 @@ | |
| ], | ||
| }, | ||
| }, | ||
| "public.cps.za.dlchange": { | ||
| TOPIC_DLCHANGE: { | ||
| "main": "public_cps_za_dlchange", | ||
| "columns": { | ||
| "main": [ | ||
|
|
@@ -80,7 +99,7 @@ | |
| ], | ||
| }, | ||
| }, | ||
| "public.cps.za.test": { | ||
| TOPIC_TEST: { | ||
| "main": "public_cps_za_test", | ||
| "columns": { | ||
| "main": [ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓