From b9e2dd5789fadd709223682217eb2c422ace612a Mon Sep 17 00:00:00 2001 From: eutwt <11261404+eutwt@users.noreply.github.com> Date: Sun, 28 Dec 2025 16:10:49 -0500 Subject: [PATCH 1/3] Refactor comparison module layout --- python/versus/comparison/__init__.py | 3 +- python/versus/comparison/_compute.py | 330 +------- python/versus/comparison/_frames.py | 317 +++++++ python/versus/comparison/_helpers.py | 799 +++--------------- python/versus/comparison/_inputs.py | 181 ++++ python/versus/comparison/_relations.py | 36 + python/versus/comparison/_slices.py | 2 +- python/versus/comparison/_sql.py | 147 ++++ python/versus/comparison/_summary.py | 99 +++ python/versus/comparison/_types.py | 58 ++ python/versus/comparison/_validation.py | 206 +++++ python/versus/comparison/_value_diffs.py | 2 +- python/versus/comparison/_weave.py | 2 +- python/versus/comparison/api.py | 161 ++++ .../comparison/{_core.py => comparison.py} | 155 +--- 15 files changed, 1329 insertions(+), 1169 deletions(-) create mode 100644 python/versus/comparison/_frames.py create mode 100644 python/versus/comparison/_inputs.py create mode 100644 python/versus/comparison/_relations.py create mode 100644 python/versus/comparison/_sql.py create mode 100644 python/versus/comparison/_summary.py create mode 100644 python/versus/comparison/_types.py create mode 100644 python/versus/comparison/_validation.py create mode 100644 python/versus/comparison/api.py rename python/versus/comparison/{_core.py => comparison.py} (80%) diff --git a/python/versus/comparison/__init__.py b/python/versus/comparison/__init__.py index 232654c..86e2ab5 100644 --- a/python/versus/comparison/__init__.py +++ b/python/versus/comparison/__init__.py @@ -1,4 +1,5 @@ -from ._core import Comparison, compare +from .api import compare +from .comparison import Comparison from ._exceptions import ComparisonError __all__ = [ diff --git a/python/versus/comparison/_compute.py b/python/versus/comparison/_compute.py index 7efb59c..5d52ec8 100644 --- a/python/versus/comparison/_compute.py +++ b/python/versus/comparison/_compute.py @@ -1,317 +1,13 @@ -from __future__ import annotations - -from typing import Dict, List, Mapping, Optional, Tuple - -import duckdb - -from . import _helpers as h - - -def build_tables_frame( - conn: h.VersusConn, - handles: Mapping[str, h._TableHandle], - table_id: Tuple[str, str], - materialize: bool, -) -> duckdb.DuckDBPyRelation: - def row_for(identifier: str) -> Tuple[str, int, int]: - handle = handles[identifier] - return identifier, h.table_count(handle), len(handle.columns) - - rows = [row_for(identifier) for identifier in table_id] - schema = [ - ("table_name", "VARCHAR"), - ("nrow", "BIGINT"), - ("ncol", "BIGINT"), - ] - return h.build_rows_relation(conn, rows, schema, materialize) - - -def build_by_frame( - conn: h.VersusConn, - by_columns: List[str], - handles: Mapping[str, h._TableHandle], - table_id: Tuple[str, str], - materialize: bool, -) -> duckdb.DuckDBPyRelation: - first, second = table_id - rows = [ - ( - column, - handles[first].types[column], - handles[second].types[column], - ) - for column in by_columns - ] - schema = [ - ("column", "VARCHAR"), - (f"type_{first}", "VARCHAR"), - (f"type_{second}", "VARCHAR"), - ] - return h.build_rows_relation(conn, rows, schema, materialize) - - -def build_unmatched_cols( - conn: h.VersusConn, - handles: Mapping[str, h._TableHandle], - table_id: Tuple[str, str], - materialize: bool, -) -> duckdb.DuckDBPyRelation: - first, second = table_id - cols_first = set(handles[first].columns) - cols_second = set(handles[second].columns) - rows = [ - (first, column, handles[first].types[column]) - for column in sorted(cols_first - cols_second) - ] + [ - (second, column, handles[second].types[column]) - for column in sorted(cols_second - cols_first) - ] - schema = [ - ("table_name", "VARCHAR"), - ("column", "VARCHAR"), - ("type", "VARCHAR"), - ] - return h.build_rows_relation(conn, rows, schema, materialize) - - -def build_intersection_frame( - value_columns: List[str], - handles: Mapping[str, h._TableHandle], - table_id: Tuple[str, str], - by_columns: List[str], - allow_both_na: bool, - diff_table: Optional[duckdb.DuckDBPyRelation], - conn: h.VersusConn, - materialize: bool, -) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: - if diff_table is None: - return _build_intersection_frame_inline( - value_columns, - handles, - table_id, - by_columns, - allow_both_na, - conn, - materialize, - ) - return _build_intersection_frame_with_table( - value_columns, handles, table_id, diff_table, conn, materialize - ) - - -def _build_empty_intersection_relation( - conn: h.VersusConn, - table_id: Tuple[str, str], - materialize: bool, -) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: - first, second = table_id - schema = [ - ("column", "VARCHAR"), - ("n_diffs", "BIGINT"), - (f"type_{first}", "VARCHAR"), - (f"type_{second}", "VARCHAR"), - ] - relation = h.build_rows_relation(conn, [], schema, materialize) - return relation, {} if materialize else None - - -def _build_intersection_frame_with_table( - value_columns: List[str], - handles: Mapping[str, h._TableHandle], - table_id: Tuple[str, str], - diff_table: duckdb.DuckDBPyRelation, - conn: h.VersusConn, - materialize: bool, -) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: - first, second = table_id - if not value_columns: - return _build_empty_intersection_relation(conn, table_id, materialize) - - def diff_alias(column: str) -> str: - return f"n_diffs_{column}" - - count_columns = ",\n ".join( - f"COUNT(*) FILTER (WHERE diffs.{h.ident(column)}) " - f"AS {h.ident(diff_alias(column))}" - for column in value_columns - ) - - def select_for(column: str) -> str: - return f""" - SELECT - {h.sql_literal(column)} AS {h.ident("column")}, - counts.{h.ident(diff_alias(column))} AS {h.ident("n_diffs")}, - {h.sql_literal(handles[first].types[column])} AS {h.ident(f"type_{first}")}, - {h.sql_literal(handles[second].types[column])} AS {h.ident(f"type_{second}")} - FROM - counts - """ - - diff_table_sql = diff_table.sql_query() - sql = f""" - WITH counts AS ( - SELECT - {count_columns} - FROM - ({diff_table_sql}) AS diffs - ) - {" UNION ALL ".join(select_for(column) for column in value_columns)} - """ - relation = h.finalize_relation(conn, sql, materialize) - if not materialize: - return relation, None - return relation, h.diff_lookup_from_intersection(relation) - - -def _build_intersection_frame_inline( - value_columns: List[str], - handles: Mapping[str, h._TableHandle], - table_id: Tuple[str, str], - by_columns: List[str], - allow_both_na: bool, - conn: h.VersusConn, - materialize: bool, -) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: - if not value_columns: - return _build_empty_intersection_relation(conn, table_id, materialize) - first, second = table_id - join_sql = h.inputs_join_sql(handles, table_id, by_columns) - - def diff_alias(column: str) -> str: - return f"n_diffs_{column}" - - count_columns = ",\n ".join( - f"COUNT(*) FILTER (WHERE {h.diff_predicate(column, allow_both_na, 'a', 'b')}) " - f"AS {h.ident(diff_alias(column))}" - for column in value_columns - ) - - def select_for(column: str) -> str: - return f""" - SELECT - {h.sql_literal(column)} AS {h.ident("column")}, - counts.{h.ident(diff_alias(column))} AS {h.ident("n_diffs")}, - {h.sql_literal(handles[first].types[column])} AS {h.ident(f"type_{first}")}, - {h.sql_literal(handles[second].types[column])} AS {h.ident(f"type_{second}")} - FROM - counts - """ - - sql = f""" - WITH counts AS ( - SELECT - {count_columns} - FROM - {join_sql} - ) - {" UNION ALL ".join(select_for(column) for column in value_columns)} - """ - relation = h.finalize_relation(conn, sql, materialize) - if not materialize: - return relation, None - return relation, h.diff_lookup_from_intersection(relation) - - -def compute_diff_table( - conn: h.VersusConn, - handles: Mapping[str, h._TableHandle], - table_id: Tuple[str, str], - by_columns: List[str], - value_columns: List[str], - allow_both_na: bool, -) -> duckdb.DuckDBPyRelation: - if not value_columns: - schema = [(column, handles[table_id[0]].types[column]) for column in by_columns] - return h.build_rows_relation(conn, [], schema, materialize=True) - join_sql = h.inputs_join_sql(handles, table_id, by_columns) - select_by = h.select_cols(by_columns, alias="a") - diff_expressions = [ - (column, h.diff_predicate(column, allow_both_na, "a", "b")) - for column in value_columns - ] - diff_flags = ",\n ".join( - f"{expression} AS {h.ident(column)}" for column, expression in diff_expressions - ) - predicate = " OR ".join(expression for _, expression in diff_expressions) - sql = f""" - SELECT - {select_by}, - {diff_flags} - FROM - {join_sql} - WHERE - {predicate} - """ - return h.finalize_relation(conn, sql, materialize=True) - - -def compute_unmatched_keys( - conn: h.VersusConn, - handles: Mapping[str, h._TableHandle], - table_id: Tuple[str, str], - by_columns: List[str], - materialize: bool, -) -> duckdb.DuckDBPyRelation: - def key_part(identifier: str) -> str: - other = table_id[1] if identifier == table_id[0] else table_id[0] - handle_left = handles[identifier] - handle_right = handles[other] - select_by = h.select_cols(by_columns, alias="left_tbl") - condition = h.join_condition(by_columns, "left_tbl", "right_tbl") - return f""" - SELECT - {h.sql_literal(identifier)} AS table_name, - {select_by} - FROM - {h.table_ref(handle_left)} AS left_tbl - ANTI JOIN {h.table_ref(handle_right)} AS right_tbl - ON {condition} - """ - - keys_parts = [key_part(identifier) for identifier in table_id] - unmatched_keys_sql = " UNION ALL ".join(keys_parts) - return h.finalize_relation(conn, unmatched_keys_sql, materialize) - - -def compute_unmatched_rows_summary( - conn: h.VersusConn, - unmatched_keys: duckdb.DuckDBPyRelation, - table_id: Tuple[str, str], - materialize: bool, -) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: - unmatched_keys_sql = unmatched_keys.sql_query() - table_col = h.ident("table_name") - count_col = h.ident("n_unmatched") - base_sql = h.rows_relation_sql( - [(table_id[0],), (table_id[1],)], [("table_name", "VARCHAR")] - ) - counts_sql = f""" - SELECT - {table_col}, - COUNT(*) AS {count_col} - FROM - ({unmatched_keys_sql}) AS keys - GROUP BY - {table_col} - """ - order_case = ( - f"CASE base.{table_col} " - f"WHEN {h.sql_literal(table_id[0])} THEN 0 " - f"WHEN {h.sql_literal(table_id[1])} THEN 1 " - "ELSE 2 END" - ) - sql = f""" - SELECT - base.{table_col} AS {table_col}, - COALESCE(counts.{count_col}, CAST(0 AS BIGINT)) AS {count_col} - FROM - ({base_sql}) AS base - LEFT JOIN ({counts_sql}) AS counts - ON base.{table_col} = counts.{table_col} - ORDER BY - {order_case} - """ - relation = h.finalize_relation(conn, sql, materialize) - if not materialize: - return relation, None - return relation, h.unmatched_lookup_from_rows(relation) +from . import _frames as _frames + +build_tables_frame = _frames.build_tables_frame +build_by_frame = _frames.build_by_frame +build_unmatched_cols = _frames.build_unmatched_cols +build_intersection_frame = _frames.build_intersection_frame +compute_diff_table = _frames.compute_diff_table +compute_unmatched_keys = _frames.compute_unmatched_keys +compute_unmatched_rows_summary = _frames.compute_unmatched_rows_summary + +_build_empty_intersection_relation = _frames._build_empty_intersection_relation +_build_intersection_frame_with_table = _frames._build_intersection_frame_with_table +_build_intersection_frame_inline = _frames._build_intersection_frame_inline diff --git a/python/versus/comparison/_frames.py b/python/versus/comparison/_frames.py new file mode 100644 index 0000000..7efb59c --- /dev/null +++ b/python/versus/comparison/_frames.py @@ -0,0 +1,317 @@ +from __future__ import annotations + +from typing import Dict, List, Mapping, Optional, Tuple + +import duckdb + +from . import _helpers as h + + +def build_tables_frame( + conn: h.VersusConn, + handles: Mapping[str, h._TableHandle], + table_id: Tuple[str, str], + materialize: bool, +) -> duckdb.DuckDBPyRelation: + def row_for(identifier: str) -> Tuple[str, int, int]: + handle = handles[identifier] + return identifier, h.table_count(handle), len(handle.columns) + + rows = [row_for(identifier) for identifier in table_id] + schema = [ + ("table_name", "VARCHAR"), + ("nrow", "BIGINT"), + ("ncol", "BIGINT"), + ] + return h.build_rows_relation(conn, rows, schema, materialize) + + +def build_by_frame( + conn: h.VersusConn, + by_columns: List[str], + handles: Mapping[str, h._TableHandle], + table_id: Tuple[str, str], + materialize: bool, +) -> duckdb.DuckDBPyRelation: + first, second = table_id + rows = [ + ( + column, + handles[first].types[column], + handles[second].types[column], + ) + for column in by_columns + ] + schema = [ + ("column", "VARCHAR"), + (f"type_{first}", "VARCHAR"), + (f"type_{second}", "VARCHAR"), + ] + return h.build_rows_relation(conn, rows, schema, materialize) + + +def build_unmatched_cols( + conn: h.VersusConn, + handles: Mapping[str, h._TableHandle], + table_id: Tuple[str, str], + materialize: bool, +) -> duckdb.DuckDBPyRelation: + first, second = table_id + cols_first = set(handles[first].columns) + cols_second = set(handles[second].columns) + rows = [ + (first, column, handles[first].types[column]) + for column in sorted(cols_first - cols_second) + ] + [ + (second, column, handles[second].types[column]) + for column in sorted(cols_second - cols_first) + ] + schema = [ + ("table_name", "VARCHAR"), + ("column", "VARCHAR"), + ("type", "VARCHAR"), + ] + return h.build_rows_relation(conn, rows, schema, materialize) + + +def build_intersection_frame( + value_columns: List[str], + handles: Mapping[str, h._TableHandle], + table_id: Tuple[str, str], + by_columns: List[str], + allow_both_na: bool, + diff_table: Optional[duckdb.DuckDBPyRelation], + conn: h.VersusConn, + materialize: bool, +) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: + if diff_table is None: + return _build_intersection_frame_inline( + value_columns, + handles, + table_id, + by_columns, + allow_both_na, + conn, + materialize, + ) + return _build_intersection_frame_with_table( + value_columns, handles, table_id, diff_table, conn, materialize + ) + + +def _build_empty_intersection_relation( + conn: h.VersusConn, + table_id: Tuple[str, str], + materialize: bool, +) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: + first, second = table_id + schema = [ + ("column", "VARCHAR"), + ("n_diffs", "BIGINT"), + (f"type_{first}", "VARCHAR"), + (f"type_{second}", "VARCHAR"), + ] + relation = h.build_rows_relation(conn, [], schema, materialize) + return relation, {} if materialize else None + + +def _build_intersection_frame_with_table( + value_columns: List[str], + handles: Mapping[str, h._TableHandle], + table_id: Tuple[str, str], + diff_table: duckdb.DuckDBPyRelation, + conn: h.VersusConn, + materialize: bool, +) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: + first, second = table_id + if not value_columns: + return _build_empty_intersection_relation(conn, table_id, materialize) + + def diff_alias(column: str) -> str: + return f"n_diffs_{column}" + + count_columns = ",\n ".join( + f"COUNT(*) FILTER (WHERE diffs.{h.ident(column)}) " + f"AS {h.ident(diff_alias(column))}" + for column in value_columns + ) + + def select_for(column: str) -> str: + return f""" + SELECT + {h.sql_literal(column)} AS {h.ident("column")}, + counts.{h.ident(diff_alias(column))} AS {h.ident("n_diffs")}, + {h.sql_literal(handles[first].types[column])} AS {h.ident(f"type_{first}")}, + {h.sql_literal(handles[second].types[column])} AS {h.ident(f"type_{second}")} + FROM + counts + """ + + diff_table_sql = diff_table.sql_query() + sql = f""" + WITH counts AS ( + SELECT + {count_columns} + FROM + ({diff_table_sql}) AS diffs + ) + {" UNION ALL ".join(select_for(column) for column in value_columns)} + """ + relation = h.finalize_relation(conn, sql, materialize) + if not materialize: + return relation, None + return relation, h.diff_lookup_from_intersection(relation) + + +def _build_intersection_frame_inline( + value_columns: List[str], + handles: Mapping[str, h._TableHandle], + table_id: Tuple[str, str], + by_columns: List[str], + allow_both_na: bool, + conn: h.VersusConn, + materialize: bool, +) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: + if not value_columns: + return _build_empty_intersection_relation(conn, table_id, materialize) + first, second = table_id + join_sql = h.inputs_join_sql(handles, table_id, by_columns) + + def diff_alias(column: str) -> str: + return f"n_diffs_{column}" + + count_columns = ",\n ".join( + f"COUNT(*) FILTER (WHERE {h.diff_predicate(column, allow_both_na, 'a', 'b')}) " + f"AS {h.ident(diff_alias(column))}" + for column in value_columns + ) + + def select_for(column: str) -> str: + return f""" + SELECT + {h.sql_literal(column)} AS {h.ident("column")}, + counts.{h.ident(diff_alias(column))} AS {h.ident("n_diffs")}, + {h.sql_literal(handles[first].types[column])} AS {h.ident(f"type_{first}")}, + {h.sql_literal(handles[second].types[column])} AS {h.ident(f"type_{second}")} + FROM + counts + """ + + sql = f""" + WITH counts AS ( + SELECT + {count_columns} + FROM + {join_sql} + ) + {" UNION ALL ".join(select_for(column) for column in value_columns)} + """ + relation = h.finalize_relation(conn, sql, materialize) + if not materialize: + return relation, None + return relation, h.diff_lookup_from_intersection(relation) + + +def compute_diff_table( + conn: h.VersusConn, + handles: Mapping[str, h._TableHandle], + table_id: Tuple[str, str], + by_columns: List[str], + value_columns: List[str], + allow_both_na: bool, +) -> duckdb.DuckDBPyRelation: + if not value_columns: + schema = [(column, handles[table_id[0]].types[column]) for column in by_columns] + return h.build_rows_relation(conn, [], schema, materialize=True) + join_sql = h.inputs_join_sql(handles, table_id, by_columns) + select_by = h.select_cols(by_columns, alias="a") + diff_expressions = [ + (column, h.diff_predicate(column, allow_both_na, "a", "b")) + for column in value_columns + ] + diff_flags = ",\n ".join( + f"{expression} AS {h.ident(column)}" for column, expression in diff_expressions + ) + predicate = " OR ".join(expression for _, expression in diff_expressions) + sql = f""" + SELECT + {select_by}, + {diff_flags} + FROM + {join_sql} + WHERE + {predicate} + """ + return h.finalize_relation(conn, sql, materialize=True) + + +def compute_unmatched_keys( + conn: h.VersusConn, + handles: Mapping[str, h._TableHandle], + table_id: Tuple[str, str], + by_columns: List[str], + materialize: bool, +) -> duckdb.DuckDBPyRelation: + def key_part(identifier: str) -> str: + other = table_id[1] if identifier == table_id[0] else table_id[0] + handle_left = handles[identifier] + handle_right = handles[other] + select_by = h.select_cols(by_columns, alias="left_tbl") + condition = h.join_condition(by_columns, "left_tbl", "right_tbl") + return f""" + SELECT + {h.sql_literal(identifier)} AS table_name, + {select_by} + FROM + {h.table_ref(handle_left)} AS left_tbl + ANTI JOIN {h.table_ref(handle_right)} AS right_tbl + ON {condition} + """ + + keys_parts = [key_part(identifier) for identifier in table_id] + unmatched_keys_sql = " UNION ALL ".join(keys_parts) + return h.finalize_relation(conn, unmatched_keys_sql, materialize) + + +def compute_unmatched_rows_summary( + conn: h.VersusConn, + unmatched_keys: duckdb.DuckDBPyRelation, + table_id: Tuple[str, str], + materialize: bool, +) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: + unmatched_keys_sql = unmatched_keys.sql_query() + table_col = h.ident("table_name") + count_col = h.ident("n_unmatched") + base_sql = h.rows_relation_sql( + [(table_id[0],), (table_id[1],)], [("table_name", "VARCHAR")] + ) + counts_sql = f""" + SELECT + {table_col}, + COUNT(*) AS {count_col} + FROM + ({unmatched_keys_sql}) AS keys + GROUP BY + {table_col} + """ + order_case = ( + f"CASE base.{table_col} " + f"WHEN {h.sql_literal(table_id[0])} THEN 0 " + f"WHEN {h.sql_literal(table_id[1])} THEN 1 " + "ELSE 2 END" + ) + sql = f""" + SELECT + base.{table_col} AS {table_col}, + COALESCE(counts.{count_col}, CAST(0 AS BIGINT)) AS {count_col} + FROM + ({base_sql}) AS base + LEFT JOIN ({counts_sql}) AS counts + ON base.{table_col} = counts.{table_col} + ORDER BY + {order_case} + """ + relation = h.finalize_relation(conn, sql, materialize) + if not materialize: + return relation, None + return relation, h.unmatched_lookup_from_rows(relation) diff --git a/python/versus/comparison/_helpers.py b/python/versus/comparison/_helpers.py index 3387e0e..54e319c 100644 --- a/python/versus/comparison/_helpers.py +++ b/python/versus/comparison/_helpers.py @@ -1,697 +1,108 @@ -from __future__ import annotations - -import uuid -from collections import Counter -from dataclasses import dataclass -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - List, - Mapping, - Optional, - Sequence, - Tuple, - Union, - cast, -) - -import duckdb - from ._exceptions import ComparisonError +from ._inputs import ( + assert_relation_connection, + build_table_handle, + build_table_handle_from_frame, + build_table_handle_from_relation, + describe_source, + raise_relation_connection_error, + resolve_row_count, + row_count_from_frame, + source_ref_for_sql, +) +from ._relations import ( + diff_lookup_from_intersection, + relation_is_empty, + table_count, + unmatched_lookup_from_rows, +) +from ._sql import ( + col, + collect_diff_keys, + diff_predicate, + fetch_rows_by_keys, + ident, + inputs_join_sql, + join_condition, + require_diff_table, + run_sql, + select_cols, + select_zero_from_table, + sql_literal, + table_ref, +) +from ._summary import ( + SummaryRelation, + build_rows_relation, + finalize_relation, + materialize_temp_table, + rows_relation_sql, +) +from ._types import _Input, _TableHandle, VersusConn, VersusState +from ._validation import ( + assert_column_allowed, + assert_unique_by, + normalize_column_list, + normalize_single_column, + normalize_table_arg, + resolve_column_list, + resolve_connection, + resolve_materialize, + validate_columns, + validate_columns_exist, + validate_table_id, + validate_tables, + validate_type_compatibility, +) -if TYPE_CHECKING: # pragma: no cover - import pandas - import polars - - from ._core import Comparison - -try: - from typing import TypeAlias -except ImportError: # pragma: no cover - Python < 3.10 - from typing_extensions import TypeAlias - -_Input: TypeAlias = Union[ - duckdb.DuckDBPyRelation, "pandas.DataFrame", "polars.DataFrame" +__all__ = [ + "ComparisonError", + "_Input", + "_TableHandle", + "VersusConn", + "VersusState", + "SummaryRelation", + "resolve_materialize", + "resolve_connection", + "validate_columns_exist", + "validate_type_compatibility", + "validate_columns", + "validate_tables", + "assert_unique_by", + "validate_table_id", + "normalize_column_list", + "normalize_table_arg", + "normalize_single_column", + "resolve_column_list", + "assert_column_allowed", + "build_table_handle", + "build_table_handle_from_relation", + "build_table_handle_from_frame", + "describe_source", + "source_ref_for_sql", + "resolve_row_count", + "row_count_from_frame", + "raise_relation_connection_error", + "assert_relation_connection", + "ident", + "col", + "table_ref", + "select_cols", + "join_condition", + "inputs_join_sql", + "diff_predicate", + "sql_literal", + "run_sql", + "require_diff_table", + "collect_diff_keys", + "fetch_rows_by_keys", + "select_zero_from_table", + "relation_is_empty", + "diff_lookup_from_intersection", + "unmatched_lookup_from_rows", + "rows_relation_sql", + "materialize_temp_table", + "finalize_relation", + "build_rows_relation", + "table_count", ] - - -# --------------- Data structures -@dataclass -class _TableHandle: - name: str - display: str - relation: duckdb.DuckDBPyRelation - columns: List[str] - types: Dict[str, str] - source_sql: str - source_is_identifier: bool - row_count: int - - def __getattr__(self, name: str) -> Any: - return getattr(self.relation, name) - - -@dataclass -class VersusState: - temp_tables: List[str] - views: List[str] - - -class VersusConn: - def __init__( - self, - connection: duckdb.DuckDBPyConnection, - *, - temp_tables: Optional[List[str]] = None, - views: Optional[List[str]] = None, - ) -> None: - self.raw_connection = connection - self.versus = VersusState( - temp_tables if temp_tables is not None else [], - views if views is not None else [], - ) - - def __getattr__(self, name: str) -> Any: - return getattr(self.raw_connection, name) - - -class SummaryRelation: - def __init__( - self, - conn: VersusConn, - relation: duckdb.DuckDBPyRelation, - *, - materialized: bool, - on_materialize: Optional[Callable[[duckdb.DuckDBPyRelation], None]] = None, - ) -> None: - self._conn = conn - self.relation = relation - self.materialized = materialized - self._on_materialize = on_materialize - if self.materialized and self._on_materialize is not None: - self._on_materialize(self.relation) - - def materialize(self) -> None: - if self.materialized: - return - self.relation = finalize_relation( - self._conn, self.relation.sql_query(), materialize=True - ) - self.materialized = True - if self._on_materialize is not None: - self._on_materialize(self.relation) - - def __getattr__(self, name: str) -> Any: - return getattr(self.relation, name) - - def __repr__(self) -> str: - self.materialize() - return repr(self.relation) - - def __str__(self) -> str: - self.materialize() - return str(self.relation) - - def __iter__(self) -> Any: - return iter(cast(Any, self.relation)) - - -# --------------- Core-only helpers -def resolve_materialize(materialize: str) -> Tuple[bool, bool]: - if not isinstance(materialize, str) or materialize not in { - "all", - "summary", - "none", - }: - raise ComparisonError("`materialize` must be one of: 'all', 'summary', 'none'") - materialize_summary = materialize in {"all", "summary"} - materialize_keys = materialize == "all" - return materialize_summary, materialize_keys - - -def resolve_connection( - connection: Optional[duckdb.DuckDBPyConnection], -) -> VersusConn: - if connection is not None: - conn_candidate = connection - else: - default_conn = duckdb.default_connection - conn_candidate = default_conn() if callable(default_conn) else default_conn - if not isinstance(conn_candidate, duckdb.DuckDBPyConnection): - raise ComparisonError("`connection` must be a DuckDB connection.") - return VersusConn(conn_candidate) - - -def validate_columns_exist( - by_columns: Iterable[str], - handles: Mapping[str, _TableHandle], - table_id: Tuple[str, str], -) -> None: - missing_a = [col for col in by_columns if col not in handles[table_id[0]].columns] - missing_b = [col for col in by_columns if col not in handles[table_id[1]].columns] - if missing_a: - raise ComparisonError( - f"`by` columns not found in `{table_id[0]}`: {', '.join(missing_a)}" - ) - if missing_b: - raise ComparisonError( - f"`by` columns not found in `{table_id[1]}`: {', '.join(missing_b)}" - ) - - -def validate_type_compatibility( - handles: Mapping[str, _TableHandle], - table_id: Tuple[str, str], -) -> None: - shared = set(handles[table_id[0]].columns) & set(handles[table_id[1]].columns) - for column in shared: - type_a = handles[table_id[0]].types.get(column) - type_b = handles[table_id[1]].types.get(column) - if type_a != type_b: - raise ComparisonError( - f"`coerce=False` requires compatible types. Column `{column}` has types `{type_a}` vs `{type_b}`." - ) - - -def validate_columns(columns: Sequence[str], label: str) -> None: - if not all(isinstance(column, str) for column in columns): - raise ComparisonError(f"`{label}` must have string column names") - counts = Counter(columns) - duplicates = [name for name, count in counts.items() if count > 1] - if duplicates: - dupes = ", ".join(duplicates) - raise ComparisonError(f"`{label}` has duplicate column names: {dupes}") - - -def validate_tables( - conn: VersusConn, - handles: Mapping[str, _TableHandle], - table_id: Tuple[str, str], - by_columns: List[str], - *, - coerce: bool, -) -> None: - validate_columns_exist(by_columns, handles, table_id) - for identifier in table_id: - validate_columns(handles[identifier].columns, identifier) - if not coerce: - validate_type_compatibility(handles, table_id) - for identifier in table_id: - assert_unique_by(conn, handles[identifier], by_columns, identifier) - - -def assert_unique_by( - conn: VersusConn, - handle: _TableHandle, - by_columns: List[str], - identifier: str, -) -> None: - cols = select_cols(by_columns, alias="t") - sql = f""" - SELECT - {cols}, - COUNT(*) AS n - FROM - {table_ref(handle)} AS t - GROUP BY - {cols} - HAVING - COUNT(*) > 1 - LIMIT - 1 - """ - rel = run_sql(conn, sql) - rows = rel.fetchall() - if rows: - first = rows[0] - values = ", ".join(f"{col}={first[i]!r}" for i, col in enumerate(by_columns)) - raise ComparisonError( - f"`{identifier}` has more than one row for by values ({values})" - ) - - -# --------------- Input validation and normalization -def validate_table_id(table_id: Tuple[str, str]) -> Tuple[str, str]: - if ( - not isinstance(table_id, (tuple, list)) - or len(table_id) != 2 - or not all(isinstance(val, str) for val in table_id) - ): - raise ComparisonError("`table_id` must be a tuple of two strings") - first, second = table_id[0], table_id[1] - if not first.strip() or not second.strip(): - raise ComparisonError("Entries of `table_id` must be non-empty strings") - if first == second: - raise ComparisonError("Entries of `table_id` must be distinct") - return (first, second) - - -def normalize_column_list( - columns: Sequence[str], - arg_name: str, - *, - allow_empty: bool, -) -> List[str]: - if isinstance(columns, str): - parsed = [columns] - else: - try: - parsed = list(columns) - except TypeError as exc: - raise ComparisonError( - f"`{arg_name}` must be a sequence of column names" - ) from exc - if not parsed and not allow_empty: - raise ComparisonError(f"`{arg_name}` must contain at least one column") - if not all(isinstance(item, str) for item in parsed): - raise ComparisonError(f"`{arg_name}` must only contain strings") - return parsed - - -def normalize_table_arg(comparison: "Comparison", table: str) -> str: - if table not in comparison.table_id: - allowed = ", ".join(comparison.table_id) - raise ComparisonError(f"`table` must be one of: {allowed}") - return table - - -def normalize_single_column(column: str) -> str: - if isinstance(column, str): - return column - raise ComparisonError("`column` must be a column name") - - -def resolve_column_list( - comparison: "Comparison", - columns: Optional[Sequence[str]], - *, - allow_empty: bool = True, -) -> List[str]: - if columns is None: - parsed = comparison.common_columns[:] - else: - cols = normalize_column_list(columns, "column", allow_empty=True) - if not cols: - raise ComparisonError("`columns` must select at least one column") - missing = [col for col in cols if col not in comparison.common_columns] - if missing: - raise ComparisonError( - f"Columns not part of the comparison: {', '.join(missing)}" - ) - parsed = cols - if not parsed and not allow_empty: - raise ComparisonError("`columns` must select at least one column") - return parsed - - -def assert_column_allowed(comparison: "Comparison", column: str, func: str) -> None: - if column not in comparison.common_columns: - raise ComparisonError( - f"`{func}` can only reference columns in both tables: {column}" - ) - - -# --------------- Input registration and metadata -def build_table_handle( - conn: VersusConn, - source: _Input, - label: str, - *, - connection_supplied: bool, -) -> _TableHandle: - name = f"__versus_{label}_{uuid.uuid4().hex}" - if isinstance(source, duckdb.DuckDBPyRelation): - return _build_table_handle_from_relation( - conn, - source, - label, - name=name, - connection_supplied=connection_supplied, - ) - if isinstance(source, str): - raise ComparisonError( - "String inputs are not supported. Pass a DuckDB relation or pandas/polars " - "DataFrame." - ) - return _build_table_handle_from_frame(conn, source, label, name=name) - - -def _build_table_handle_from_relation( - conn: VersusConn, - source: duckdb.DuckDBPyRelation, - label: str, - *, - name: str, - connection_supplied: bool, -) -> _TableHandle: - validate_columns(source.columns, label) - source_sql = source.sql_query() - display = getattr(source, "alias", "relation") - assert_relation_connection(conn, source, label, connection_supplied) - try: - columns, types = describe_source(conn, source_sql, is_identifier=False) - except duckdb.Error as exc: - raise_relation_connection_error(label, connection_supplied, exc) - row_count = resolve_row_count(conn, source, source_sql, is_identifier=False) - relation = conn.sql(source_sql) - return _TableHandle( - name=name, - display=display, - relation=relation, - columns=columns, - types=types, - source_sql=source_sql, - source_is_identifier=False, - row_count=row_count, - ) - - -def _build_table_handle_from_frame( - conn: VersusConn, - source: _Input, - label: str, - *, - name: str, -) -> _TableHandle: - source_columns = getattr(source, "columns", None) - if source_columns is not None: - validate_columns(list(source_columns), label) - try: - conn.register(name, source) - except Exception as exc: - raise ComparisonError( - "Inputs must be DuckDB relations or pandas/polars DataFrames." - ) from exc - conn.versus.views.append(name) - source_sql = name - columns, types = describe_source(conn, source_sql, is_identifier=True) - row_count = resolve_row_count(conn, source, source_sql, is_identifier=True) - relation = conn.table(name) - return _TableHandle( - name=name, - display=type(source).__name__, - relation=relation, - columns=columns, - types=types, - source_sql=source_sql, - source_is_identifier=True, - row_count=row_count, - ) - - -def describe_source( - conn: VersusConn, - source_sql: str, - *, - is_identifier: bool, -) -> Tuple[List[str], Dict[str, str]]: - source_ref = source_ref_for_sql(source_sql, is_identifier) - rel = run_sql(conn, f"DESCRIBE SELECT * FROM {source_ref}") - rows = rel.fetchall() - columns = [row[0] for row in rows] - types = {row[0]: row[1] for row in rows} - return columns, types - - -def source_ref_for_sql(source_sql: str, is_identifier: bool) -> str: - return ident(source_sql) if is_identifier else f"({source_sql})" - - -def resolve_row_count( - conn: VersusConn, - source: _Input, - source_sql: str, - *, - is_identifier: bool, -) -> int: - frame_row_count = row_count_from_frame(source) - if frame_row_count is not None: - return frame_row_count - source_ref = source_ref_for_sql(source_sql, is_identifier) - row = run_sql(conn, f"SELECT COUNT(*) FROM {source_ref}").fetchone() - assert row is not None and isinstance(row[0], int) - return row[0] - - -def row_count_from_frame(source: _Input) -> Optional[int]: - module = type(source).__module__ - if module.startswith("pandas"): - return int(cast("pandas.DataFrame", source).shape[0]) - if module.startswith("polars"): - return int(cast("polars.DataFrame", source).height) - return None - - -def raise_relation_connection_error( - label: str, - connection_supplied: bool, - exc: Exception, -) -> None: - arg_name = f"table_{label}" - if connection_supplied: - hint = ( - f"`{arg_name}` appears to be bound to a different DuckDB " - "connection than the one passed to `compare()`. Pass the same " - "connection that created the relations via `connection=...`." - ) - else: - hint = ( - f"`{arg_name}` appears to be bound to a non-default DuckDB " - "connection. Pass that connection to `compare()` via " - "`connection=...`." - ) - raise ComparisonError(hint) from exc - - -def assert_relation_connection( - conn: VersusConn, - relation: duckdb.DuckDBPyRelation, - label: str, - connection_supplied: bool, -) -> None: - probe_name = f"__versus_probe_{uuid.uuid4().hex}" - try: - conn.register(probe_name, relation) - except Exception as exc: - raise_relation_connection_error(label, connection_supplied, exc) - else: - conn.unregister(probe_name) - - -# --------------- SQL builder helpers -def ident(name: str) -> str: - escaped = name.replace('"', '""') - return f'"{escaped}"' - - -def col(alias: str, column: str) -> str: - return f"{alias}.{ident(column)}" - - -def table_ref(handle: _TableHandle) -> str: - if handle.source_is_identifier: - return ident(handle.source_sql) - return f"({handle.source_sql})" - - -def select_cols(columns: Sequence[str], alias: Optional[str] = None) -> str: - if not columns: - raise ComparisonError("Column list must be non-empty") - if alias is None: - return ", ".join(ident(column) for column in columns) - return ", ".join(col(alias, column) for column in columns) - - -def join_condition(by_columns: List[str], left_alias: str, right_alias: str) -> str: - comparisons = [ - f"{col(left_alias, column)} IS NOT DISTINCT FROM {col(right_alias, column)}" - for column in by_columns - ] - return " AND ".join(comparisons) if comparisons else "TRUE" - - -def inputs_join_sql( - handles: Mapping[str, _TableHandle], - table_id: Tuple[str, str], - by_columns: List[str], -) -> str: - join_condition_sql = join_condition(by_columns, "a", "b") - return ( - f"{table_ref(handles[table_id[0]])} AS a\n" - f" INNER JOIN {table_ref(handles[table_id[1]])} AS b\n" - f" ON {join_condition_sql}" - ) - - -def diff_predicate( - column: str, allow_both_na: bool, left_alias: str, right_alias: str -) -> str: - left = col(left_alias, column) - right = col(right_alias, column) - if allow_both_na: - return f"{left} IS DISTINCT FROM {right}" - return f"(({left} IS NULL AND {right} IS NULL) OR {left} IS DISTINCT FROM {right})" - - -def sql_literal(value: Any) -> str: - if value is None: - return "NULL" - if isinstance(value, str): - escaped = value.replace("'", "''") - return f"'{escaped}'" - if isinstance(value, bool): - return "TRUE" if value else "FALSE" - return str(value) - - -# --------------- Comparison-specific SQL assembly -def require_diff_table( - comparison: "Comparison", -) -> duckdb.DuckDBPyRelation: - diff_table = comparison.diff_table - if diff_table is None: - raise ComparisonError("Diff table is only available for materialize='all'.") - return diff_table - - -def collect_diff_keys(comparison: "Comparison", columns: Sequence[str]) -> str: - diff_table = require_diff_table(comparison) - diff_table_sql = diff_table.sql_query() - by_cols = select_cols(comparison.by_columns, alias="diffs") - predicate = " OR ".join(f"diffs.{ident(column)}" for column in columns) - return f""" - SELECT - {by_cols} - FROM - ({diff_table_sql}) AS diffs - WHERE - {predicate} - """ - - -def fetch_rows_by_keys( - comparison: "Comparison", - table: str, - key_sql: str, - columns: Optional[Sequence[str]] = None, -) -> duckdb.DuckDBPyRelation: - if columns is None: - select_cols_sql = "base.*" - else: - if not columns: - raise ComparisonError("Column list must be non-empty") - select_cols_sql = select_cols(columns, alias="base") - join_condition_sql = join_condition(comparison.by_columns, "keys", "base") - sql = f""" - SELECT - {select_cols_sql} - FROM - ({key_sql}) AS keys - JOIN {table_ref(comparison._handles[table])} AS base - ON {join_condition_sql} - """ - return run_sql(comparison.connection, sql) - - -# --------------- Relation utilities -def run_sql( - conn: Union[VersusConn, duckdb.DuckDBPyConnection], - sql: str, -) -> duckdb.DuckDBPyRelation: - return conn.sql(sql) - - -def relation_is_empty( - relation: Union[duckdb.DuckDBPyRelation, SummaryRelation], -) -> bool: - return relation.limit(1).fetchone() is None - - -def diff_lookup_from_intersection( - relation: duckdb.DuckDBPyRelation, -) -> Dict[str, int]: - rows = relation.fetchall() - return {row[0]: int(row[1]) for row in rows} - - -def unmatched_lookup_from_rows( - relation: duckdb.DuckDBPyRelation, -) -> Dict[str, int]: - rows = relation.fetchall() - return {row[0]: int(row[1]) for row in rows} - - -def rows_relation_sql( - rows: Sequence[Sequence[Any]], schema: Sequence[Tuple[str, str]] -) -> str: - if not rows: - select_list = ", ".join( - f"CAST(NULL AS {dtype}) AS {ident(name)}" for name, dtype in schema - ) - return f"SELECT {select_list} LIMIT 0" - value_rows = [ - "(" + ", ".join(sql_literal(value) for value in row) + ")" for row in rows - ] - alias_cols = ", ".join(f"col{i}" for i in range(len(schema))) - select_list = ", ".join( - f"CAST(col{i} AS {dtype}) AS {ident(name)}" - for i, (name, dtype) in enumerate(schema) - ) - return ( - f"SELECT {select_list} FROM (VALUES {', '.join(value_rows)}) AS v({alias_cols})" - ) - - -def materialize_temp_table(conn: VersusConn, sql: str) -> str: - name = f"__versus_table_{uuid.uuid4().hex}" - conn.execute(f"CREATE OR REPLACE TEMP TABLE {ident(name)} AS {sql}") - return name - - -def finalize_relation( - conn: VersusConn, - sql: str, - materialize: bool, -) -> duckdb.DuckDBPyRelation: - if not materialize: - return conn.sql(sql) - table = materialize_temp_table(conn, sql) - conn.versus.temp_tables.append(table) - return conn.sql(f"SELECT * FROM {ident(table)}") - - -def build_rows_relation( - conn: VersusConn, - rows: Sequence[Sequence[Any]], - schema: Sequence[Tuple[str, str]], - materialize: bool, -) -> duckdb.DuckDBPyRelation: - sql = rows_relation_sql(rows, schema) - return finalize_relation(conn, sql, materialize) - - -def table_count(relation: Union[duckdb.DuckDBPyRelation, _TableHandle]) -> int: - if isinstance(relation, _TableHandle): - return relation.row_count - row = relation.count("*").fetchall()[0] - assert isinstance(row[0], int) - return row[0] - - -def select_zero_from_table( - comparison: "Comparison", - table: str, - columns: Optional[Sequence[str]] = None, -) -> duckdb.DuckDBPyRelation: - handle = comparison._handles[table] - if columns is None: - sql = f"SELECT * FROM {table_ref(handle)} LIMIT 0" - return run_sql(comparison.connection, sql) - if not columns: - raise ComparisonError("Column list must be non-empty") - select_cols_sql = select_cols(columns) - sql = f"SELECT {select_cols_sql} FROM {table_ref(handle)} LIMIT 0" - return run_sql(comparison.connection, sql) diff --git a/python/versus/comparison/_inputs.py b/python/versus/comparison/_inputs.py new file mode 100644 index 0000000..300ad1e --- /dev/null +++ b/python/versus/comparison/_inputs.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import uuid +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, cast + +import duckdb + +from ._exceptions import ComparisonError +from ._sql import ident, run_sql +from ._types import _Input, _TableHandle, VersusConn +from ._validation import validate_columns + +if TYPE_CHECKING: # pragma: no cover + import pandas + import polars + + +def build_table_handle( + conn: VersusConn, + source: _Input, + label: str, + *, + connection_supplied: bool, +) -> _TableHandle: + name = f"__versus_{label}_{uuid.uuid4().hex}" + if isinstance(source, duckdb.DuckDBPyRelation): + return build_table_handle_from_relation( + conn, + source, + label, + name=name, + connection_supplied=connection_supplied, + ) + if isinstance(source, str): + raise ComparisonError( + "String inputs are not supported. Pass a DuckDB relation or pandas/polars " + "DataFrame." + ) + return build_table_handle_from_frame(conn, source, label, name=name) + + +def build_table_handle_from_relation( + conn: VersusConn, + source: duckdb.DuckDBPyRelation, + label: str, + *, + name: str, + connection_supplied: bool, +) -> _TableHandle: + validate_columns(source.columns, label) + source_sql = source.sql_query() + display = getattr(source, "alias", "relation") + assert_relation_connection(conn, source, label, connection_supplied) + try: + columns, types = describe_source(conn, source_sql, is_identifier=False) + except duckdb.Error as exc: + raise_relation_connection_error(label, connection_supplied, exc) + row_count = resolve_row_count(conn, source, source_sql, is_identifier=False) + relation = conn.sql(source_sql) + return _TableHandle( + name=name, + display=display, + relation=relation, + columns=columns, + types=types, + source_sql=source_sql, + source_is_identifier=False, + row_count=row_count, + ) + + +def build_table_handle_from_frame( + conn: VersusConn, + source: _Input, + label: str, + *, + name: str, +) -> _TableHandle: + source_columns = getattr(source, "columns", None) + if source_columns is not None: + validate_columns(list(source_columns), label) + try: + conn.register(name, source) + except Exception as exc: + raise ComparisonError( + "Inputs must be DuckDB relations or pandas/polars DataFrames." + ) from exc + conn.versus.views.append(name) + source_sql = name + columns, types = describe_source(conn, source_sql, is_identifier=True) + row_count = resolve_row_count(conn, source, source_sql, is_identifier=True) + relation = conn.table(name) + return _TableHandle( + name=name, + display=type(source).__name__, + relation=relation, + columns=columns, + types=types, + source_sql=source_sql, + source_is_identifier=True, + row_count=row_count, + ) + + +def describe_source( + conn: VersusConn, + source_sql: str, + *, + is_identifier: bool, +) -> Tuple[List[str], Dict[str, str]]: + source_ref = source_ref_for_sql(source_sql, is_identifier) + rel = run_sql(conn, f"DESCRIBE SELECT * FROM {source_ref}") + rows = rel.fetchall() + columns = [row[0] for row in rows] + types = {row[0]: row[1] for row in rows} + return columns, types + + +def source_ref_for_sql(source_sql: str, is_identifier: bool) -> str: + return ident(source_sql) if is_identifier else f"({source_sql})" + + +def resolve_row_count( + conn: VersusConn, + source: _Input, + source_sql: str, + *, + is_identifier: bool, +) -> int: + frame_row_count = row_count_from_frame(source) + if frame_row_count is not None: + return frame_row_count + source_ref = source_ref_for_sql(source_sql, is_identifier) + row = run_sql(conn, f"SELECT COUNT(*) FROM {source_ref}").fetchone() + assert row is not None and isinstance(row[0], int) + return row[0] + + +def row_count_from_frame(source: _Input) -> Optional[int]: + module = type(source).__module__ + if module.startswith("pandas"): + return int(cast("pandas.DataFrame", source).shape[0]) + if module.startswith("polars"): + return int(cast("polars.DataFrame", source).height) + return None + + +def raise_relation_connection_error( + label: str, + connection_supplied: bool, + exc: Exception, +) -> None: + arg_name = f"table_{label}" + if connection_supplied: + hint = ( + f"`{arg_name}` appears to be bound to a different DuckDB " + "connection than the one passed to `compare()`. Pass the same " + "connection that created the relations via `connection=...`." + ) + else: + hint = ( + f"`{arg_name}` appears to be bound to a non-default DuckDB " + "connection. Pass that connection to `compare()` via " + "`connection=...`." + ) + raise ComparisonError(hint) from exc + + +def assert_relation_connection( + conn: VersusConn, + relation: duckdb.DuckDBPyRelation, + label: str, + connection_supplied: bool, +) -> None: + probe_name = f"__versus_probe_{uuid.uuid4().hex}" + try: + conn.register(probe_name, relation) + except Exception as exc: + raise_relation_connection_error(label, connection_supplied, exc) + else: + conn.unregister(probe_name) diff --git a/python/versus/comparison/_relations.py b/python/versus/comparison/_relations.py new file mode 100644 index 0000000..5bfc3f6 --- /dev/null +++ b/python/versus/comparison/_relations.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from typing import Dict, Union + +import duckdb + +from ._summary import SummaryRelation +from ._types import _TableHandle + + +def relation_is_empty( + relation: Union[duckdb.DuckDBPyRelation, SummaryRelation], +) -> bool: + return relation.limit(1).fetchone() is None + + +def diff_lookup_from_intersection( + relation: duckdb.DuckDBPyRelation, +) -> Dict[str, int]: + rows = relation.fetchall() + return {row[0]: int(row[1]) for row in rows} + + +def unmatched_lookup_from_rows( + relation: duckdb.DuckDBPyRelation, +) -> Dict[str, int]: + rows = relation.fetchall() + return {row[0]: int(row[1]) for row in rows} + + +def table_count(relation: Union[duckdb.DuckDBPyRelation, _TableHandle]) -> int: + if isinstance(relation, _TableHandle): + return relation.row_count + row = relation.count("*").fetchall()[0] + assert isinstance(row[0], int) + return row[0] diff --git a/python/versus/comparison/_slices.py b/python/versus/comparison/_slices.py index 74e443a..b4db32a 100644 --- a/python/versus/comparison/_slices.py +++ b/python/versus/comparison/_slices.py @@ -7,7 +7,7 @@ from . import _helpers as h if TYPE_CHECKING: # pragma: no cover - from ._core import Comparison + from .comparison import Comparison def slice_diffs( diff --git a/python/versus/comparison/_sql.py b/python/versus/comparison/_sql.py new file mode 100644 index 0000000..7387a5e --- /dev/null +++ b/python/versus/comparison/_sql.py @@ -0,0 +1,147 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, List, Mapping, Optional, Sequence, Tuple, Union + +import duckdb + +from ._exceptions import ComparisonError +from ._types import _TableHandle, VersusConn + +if TYPE_CHECKING: # pragma: no cover + from .comparison import Comparison + + +def ident(name: str) -> str: + escaped = name.replace('"', '""') + return f'"{escaped}"' + + +def col(alias: str, column: str) -> str: + return f"{alias}.{ident(column)}" + + +def table_ref(handle: _TableHandle) -> str: + if handle.source_is_identifier: + return ident(handle.source_sql) + return f"({handle.source_sql})" + + +def select_cols(columns: Sequence[str], alias: Optional[str] = None) -> str: + if not columns: + raise ComparisonError("Column list must be non-empty") + if alias is None: + return ", ".join(ident(column) for column in columns) + return ", ".join(col(alias, column) for column in columns) + + +def join_condition(by_columns: List[str], left_alias: str, right_alias: str) -> str: + comparisons = [ + f"{col(left_alias, column)} IS NOT DISTINCT FROM {col(right_alias, column)}" + for column in by_columns + ] + return " AND ".join(comparisons) if comparisons else "TRUE" + + +def inputs_join_sql( + handles: Mapping[str, _TableHandle], + table_id: Tuple[str, str], + by_columns: List[str], +) -> str: + join_condition_sql = join_condition(by_columns, "a", "b") + return ( + f"{table_ref(handles[table_id[0]])} AS a\n" + f" INNER JOIN {table_ref(handles[table_id[1]])} AS b\n" + f" ON {join_condition_sql}" + ) + + +def diff_predicate( + column: str, allow_both_na: bool, left_alias: str, right_alias: str +) -> str: + left = col(left_alias, column) + right = col(right_alias, column) + if allow_both_na: + return f"{left} IS DISTINCT FROM {right}" + return f"(({left} IS NULL AND {right} IS NULL) OR {left} IS DISTINCT FROM {right})" + + +def sql_literal(value: Any) -> str: + if value is None: + return "NULL" + if isinstance(value, str): + escaped = value.replace("'", "''") + return f"'{escaped}'" + if isinstance(value, bool): + return "TRUE" if value else "FALSE" + return str(value) + + +def run_sql( + conn: Union[VersusConn, duckdb.DuckDBPyConnection], + sql: str, +) -> duckdb.DuckDBPyRelation: + return conn.sql(sql) + + +def require_diff_table( + comparison: "Comparison", +) -> duckdb.DuckDBPyRelation: + diff_table = comparison.diff_table + if diff_table is None: + raise ComparisonError("Diff table is only available for materialize='all'.") + return diff_table + + +def collect_diff_keys(comparison: "Comparison", columns: Sequence[str]) -> str: + diff_table = require_diff_table(comparison) + diff_table_sql = diff_table.sql_query() + by_cols = select_cols(comparison.by_columns, alias="diffs") + predicate = " OR ".join(f"diffs.{ident(column)}" for column in columns) + return f""" + SELECT + {by_cols} + FROM + ({diff_table_sql}) AS diffs + WHERE + {predicate} + """ + + +def fetch_rows_by_keys( + comparison: "Comparison", + table: str, + key_sql: str, + columns: Optional[Sequence[str]] = None, +) -> duckdb.DuckDBPyRelation: + if columns is None: + select_cols_sql = "base.*" + else: + if not columns: + raise ComparisonError("Column list must be non-empty") + select_cols_sql = select_cols(columns, alias="base") + join_condition_sql = join_condition(comparison.by_columns, "keys", "base") + sql = f""" + SELECT + {select_cols_sql} + FROM + ({key_sql}) AS keys + JOIN {table_ref(comparison._handles[table])} AS base + ON {join_condition_sql} + """ + return run_sql(comparison.connection, sql) + + +def select_zero_from_table( + comparison: "Comparison", + table: str, + columns: Optional[Sequence[str]] = None, +) -> duckdb.DuckDBPyRelation: + handle = comparison._handles[table] + if columns is None: + sql = f"SELECT * FROM {table_ref(handle)} LIMIT 0" + return run_sql(comparison.connection, sql) + if not columns: + raise ComparisonError("Column list must be non-empty") + select_cols_sql = select_cols(columns) + sql = f"SELECT {select_cols_sql} FROM {table_ref(handle)} LIMIT 0" + return run_sql(comparison.connection, sql) diff --git a/python/versus/comparison/_summary.py b/python/versus/comparison/_summary.py new file mode 100644 index 0000000..42c1892 --- /dev/null +++ b/python/versus/comparison/_summary.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import uuid +from typing import Any, Callable, Optional, Sequence, Tuple, cast + +import duckdb + +from ._sql import ident, sql_literal +from ._types import VersusConn + + +class SummaryRelation: + def __init__( + self, + conn: VersusConn, + relation: duckdb.DuckDBPyRelation, + *, + materialized: bool, + on_materialize: Optional[Callable[[duckdb.DuckDBPyRelation], None]] = None, + ) -> None: + self._conn = conn + self.relation = relation + self.materialized = materialized + self._on_materialize = on_materialize + if self.materialized and self._on_materialize is not None: + self._on_materialize(self.relation) + + def materialize(self) -> None: + if self.materialized: + return + self.relation = finalize_relation( + self._conn, self.relation.sql_query(), materialize=True + ) + self.materialized = True + if self._on_materialize is not None: + self._on_materialize(self.relation) + + def __getattr__(self, name: str) -> Any: + return getattr(self.relation, name) + + def __repr__(self) -> str: + self.materialize() + return repr(self.relation) + + def __str__(self) -> str: + self.materialize() + return str(self.relation) + + def __iter__(self) -> Any: + return iter(cast(Any, self.relation)) + + +def rows_relation_sql( + rows: Sequence[Sequence[Any]], schema: Sequence[Tuple[str, str]] +) -> str: + if not rows: + select_list = ", ".join( + f"CAST(NULL AS {dtype}) AS {ident(name)}" for name, dtype in schema + ) + return f"SELECT {select_list} LIMIT 0" + value_rows = [ + "(" + ", ".join(sql_literal(value) for value in row) + ")" for row in rows + ] + alias_cols = ", ".join(f"col{i}" for i in range(len(schema))) + select_list = ", ".join( + f"CAST(col{i} AS {dtype}) AS {ident(name)}" + for i, (name, dtype) in enumerate(schema) + ) + return ( + f"SELECT {select_list} FROM (VALUES {', '.join(value_rows)}) AS v({alias_cols})" + ) + + +def materialize_temp_table(conn: VersusConn, sql: str) -> str: + name = f"__versus_table_{uuid.uuid4().hex}" + conn.execute(f"CREATE OR REPLACE TEMP TABLE {ident(name)} AS {sql}") + return name + + +def finalize_relation( + conn: VersusConn, + sql: str, + materialize: bool, +) -> duckdb.DuckDBPyRelation: + if not materialize: + return conn.sql(sql) + table = materialize_temp_table(conn, sql) + conn.versus.temp_tables.append(table) + return conn.sql(f"SELECT * FROM {ident(table)}") + + +def build_rows_relation( + conn: VersusConn, + rows: Sequence[Sequence[Any]], + schema: Sequence[Tuple[str, str]], + materialize: bool, +) -> duckdb.DuckDBPyRelation: + sql = rows_relation_sql(rows, schema) + return finalize_relation(conn, sql, materialize) diff --git a/python/versus/comparison/_types.py b/python/versus/comparison/_types.py new file mode 100644 index 0000000..4759132 --- /dev/null +++ b/python/versus/comparison/_types.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +import duckdb + +if TYPE_CHECKING: # pragma: no cover + import pandas + import polars + +try: + from typing import TypeAlias +except ImportError: # pragma: no cover - Python < 3.10 + from typing_extensions import TypeAlias + +_Input: TypeAlias = Union[ + duckdb.DuckDBPyRelation, "pandas.DataFrame", "polars.DataFrame" +] + + +@dataclass +class _TableHandle: + name: str + display: str + relation: duckdb.DuckDBPyRelation + columns: List[str] + types: Dict[str, str] + source_sql: str + source_is_identifier: bool + row_count: int + + def __getattr__(self, name: str) -> Any: + return getattr(self.relation, name) + + +@dataclass +class VersusState: + temp_tables: List[str] + views: List[str] + + +class VersusConn: + def __init__( + self, + connection: duckdb.DuckDBPyConnection, + *, + temp_tables: Optional[List[str]] = None, + views: Optional[List[str]] = None, + ) -> None: + self.raw_connection = connection + self.versus = VersusState( + temp_tables if temp_tables is not None else [], + views if views is not None else [], + ) + + def __getattr__(self, name: str) -> Any: + return getattr(self.raw_connection, name) diff --git a/python/versus/comparison/_validation.py b/python/versus/comparison/_validation.py new file mode 100644 index 0000000..ffb7dec --- /dev/null +++ b/python/versus/comparison/_validation.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +from collections import Counter +from typing import TYPE_CHECKING, Iterable, List, Mapping, Optional, Sequence, Tuple + +import duckdb + +from ._exceptions import ComparisonError +from ._sql import run_sql, select_cols, table_ref +from ._types import _TableHandle, VersusConn + +if TYPE_CHECKING: # pragma: no cover + from .comparison import Comparison + + +def resolve_materialize(materialize: str) -> Tuple[bool, bool]: + if not isinstance(materialize, str) or materialize not in { + "all", + "summary", + "none", + }: + raise ComparisonError("`materialize` must be one of: 'all', 'summary', 'none'") + materialize_summary = materialize in {"all", "summary"} + materialize_keys = materialize == "all" + return materialize_summary, materialize_keys + + +def resolve_connection( + connection: Optional[duckdb.DuckDBPyConnection], +) -> VersusConn: + if connection is not None: + conn_candidate = connection + else: + default_conn = duckdb.default_connection + conn_candidate = default_conn() if callable(default_conn) else default_conn + if not isinstance(conn_candidate, duckdb.DuckDBPyConnection): + raise ComparisonError("`connection` must be a DuckDB connection.") + return VersusConn(conn_candidate) + + +def validate_columns_exist( + by_columns: Iterable[str], + handles: Mapping[str, _TableHandle], + table_id: Tuple[str, str], +) -> None: + missing_a = [col for col in by_columns if col not in handles[table_id[0]].columns] + missing_b = [col for col in by_columns if col not in handles[table_id[1]].columns] + if missing_a: + raise ComparisonError( + f"`by` columns not found in `{table_id[0]}`: {', '.join(missing_a)}" + ) + if missing_b: + raise ComparisonError( + f"`by` columns not found in `{table_id[1]}`: {', '.join(missing_b)}" + ) + + +def validate_type_compatibility( + handles: Mapping[str, _TableHandle], + table_id: Tuple[str, str], +) -> None: + shared = set(handles[table_id[0]].columns) & set(handles[table_id[1]].columns) + for column in shared: + type_a = handles[table_id[0]].types.get(column) + type_b = handles[table_id[1]].types.get(column) + if type_a != type_b: + raise ComparisonError( + f"`coerce=False` requires compatible types. Column `{column}` has types `{type_a}` vs `{type_b}`." + ) + + +def validate_columns(columns: Sequence[str], label: str) -> None: + if not all(isinstance(column, str) for column in columns): + raise ComparisonError(f"`{label}` must have string column names") + counts = Counter(columns) + duplicates = [name for name, count in counts.items() if count > 1] + if duplicates: + dupes = ", ".join(duplicates) + raise ComparisonError(f"`{label}` has duplicate column names: {dupes}") + + +def validate_tables( + conn: VersusConn, + handles: Mapping[str, _TableHandle], + table_id: Tuple[str, str], + by_columns: List[str], + *, + coerce: bool, +) -> None: + validate_columns_exist(by_columns, handles, table_id) + for identifier in table_id: + validate_columns(handles[identifier].columns, identifier) + if not coerce: + validate_type_compatibility(handles, table_id) + for identifier in table_id: + assert_unique_by(conn, handles[identifier], by_columns, identifier) + + +def assert_unique_by( + conn: VersusConn, + handle: _TableHandle, + by_columns: List[str], + identifier: str, +) -> None: + cols = select_cols(by_columns, alias="t") + sql = f""" + SELECT + {cols}, + COUNT(*) AS n + FROM + {table_ref(handle)} AS t + GROUP BY + {cols} + HAVING + COUNT(*) > 1 + LIMIT + 1 + """ + rel = run_sql(conn, sql) + rows = rel.fetchall() + if rows: + first = rows[0] + values = ", ".join(f"{col}={first[i]!r}" for i, col in enumerate(by_columns)) + raise ComparisonError( + f"`{identifier}` has more than one row for by values ({values})" + ) + + +def validate_table_id(table_id: Tuple[str, str]) -> Tuple[str, str]: + if ( + not isinstance(table_id, (tuple, list)) + or len(table_id) != 2 + or not all(isinstance(val, str) for val in table_id) + ): + raise ComparisonError("`table_id` must be a tuple of two strings") + first, second = table_id[0], table_id[1] + if not first.strip() or not second.strip(): + raise ComparisonError("Entries of `table_id` must be non-empty strings") + if first == second: + raise ComparisonError("Entries of `table_id` must be distinct") + return (first, second) + + +def normalize_column_list( + columns: Sequence[str], + arg_name: str, + *, + allow_empty: bool, +) -> List[str]: + if isinstance(columns, str): + parsed = [columns] + else: + try: + parsed = list(columns) + except TypeError as exc: + raise ComparisonError( + f"`{arg_name}` must be a sequence of column names" + ) from exc + if not parsed and not allow_empty: + raise ComparisonError(f"`{arg_name}` must contain at least one column") + if not all(isinstance(item, str) for item in parsed): + raise ComparisonError(f"`{arg_name}` must only contain strings") + return parsed + + +def normalize_table_arg(comparison: "Comparison", table: str) -> str: + if table not in comparison.table_id: + allowed = ", ".join(comparison.table_id) + raise ComparisonError(f"`table` must be one of: {allowed}") + return table + + +def normalize_single_column(column: str) -> str: + if isinstance(column, str): + return column + raise ComparisonError("`column` must be a column name") + + +def resolve_column_list( + comparison: "Comparison", + columns: Optional[Sequence[str]], + *, + allow_empty: bool = True, +) -> List[str]: + if columns is None: + parsed = comparison.common_columns[:] + else: + cols = normalize_column_list(columns, "column", allow_empty=True) + if not cols: + raise ComparisonError("`columns` must select at least one column") + missing = [col for col in cols if col not in comparison.common_columns] + if missing: + raise ComparisonError( + f"Columns not part of the comparison: {', '.join(missing)}" + ) + parsed = cols + if not parsed and not allow_empty: + raise ComparisonError("`columns` must select at least one column") + return parsed + + +def assert_column_allowed(comparison: "Comparison", column: str, func: str) -> None: + if column not in comparison.common_columns: + raise ComparisonError( + f"`{func}` can only reference columns in both tables: {column}" + ) diff --git a/python/versus/comparison/_value_diffs.py b/python/versus/comparison/_value_diffs.py index b44756c..9cbd1fc 100644 --- a/python/versus/comparison/_value_diffs.py +++ b/python/versus/comparison/_value_diffs.py @@ -7,7 +7,7 @@ from . import _helpers as h if TYPE_CHECKING: # pragma: no cover - from ._core import Comparison + from .comparison import Comparison def value_diffs(comparison: "Comparison", column: str) -> duckdb.DuckDBPyRelation: diff --git a/python/versus/comparison/_weave.py b/python/versus/comparison/_weave.py index 9135194..65d8529 100644 --- a/python/versus/comparison/_weave.py +++ b/python/versus/comparison/_weave.py @@ -8,7 +8,7 @@ from . import _helpers as h if TYPE_CHECKING: # pragma: no cover - from ._core import Comparison + from .comparison import Comparison def weave_diffs_wide( diff --git a/python/versus/comparison/api.py b/python/versus/comparison/api.py new file mode 100644 index 0000000..19837f2 --- /dev/null +++ b/python/versus/comparison/api.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Dict, + List, + Mapping, + Optional, + Sequence, + Tuple, + Union, +) + +try: + from typing import Literal +except ImportError: # pragma: no cover - Python < 3.8 + from typing_extensions import Literal + +import duckdb + +from . import _frames as c +from . import _helpers as h +from .comparison import Comparison + +if TYPE_CHECKING: # pragma: no cover + import pandas + import polars + + +def compare( + table_a: Union[duckdb.DuckDBPyRelation, "pandas.DataFrame", "polars.DataFrame"], + table_b: Union[duckdb.DuckDBPyRelation, "pandas.DataFrame", "polars.DataFrame"], + *, + by: Sequence[str], + allow_both_na: bool = True, + coerce: bool = True, + table_id: Tuple[str, str] = ("a", "b"), + connection: Optional[duckdb.DuckDBPyConnection] = None, + materialize: Literal["all", "summary", "none"] = "all", +) -> Comparison: + """Compare two DuckDB relations by key columns. + + Parameters + ---------- + table_a, table_b : DuckDBPyRelation, pandas.DataFrame, or polars.DataFrame + DuckDB relations or pandas/polars DataFrames to compare. + by : sequence of str + Column names that uniquely identify rows. + allow_both_na : bool, default True + Whether to treat NULL/NA values as equal when both sides are missing. + coerce : bool, default True + If True, allow DuckDB to coerce compatible types. If False, require + exact type matches for shared columns. + table_id : tuple[str, str], default ("a", "b") + Labels used in outputs for the two tables. + connection : duckdb.DuckDBPyConnection, optional + DuckDB connection used to register the inputs and run queries. + materialize : {"all", "summary", "none"}, default "all" + Controls which helper tables are materialized upfront. + + Returns + ------- + Comparison + Comparison object with summary relations and diff helpers. + + Examples + -------- + >>> from versus import compare, examples + >>> comparison = compare( + ... examples.example_cars_a(), + ... examples.example_cars_b(), + ... by=["car"], + ... ) + >>> comparison.summary() + ┌────────────────┬─────────┐ + │ difference │ found │ + │ varchar │ boolean │ + ├────────────────┼─────────┤ + │ value_diffs │ true │ + │ unmatched_cols │ true │ + │ unmatched_rows │ true │ + │ type_diffs │ false │ + └────────────────┴─────────┘ + """ + materialize_summary, materialize_keys = h.resolve_materialize(materialize) + + conn = h.resolve_connection(connection) + clean_ids = h.validate_table_id(table_id) + by_columns = h.normalize_column_list(by, "by", allow_empty=False) + connection_supplied = connection is not None + handles = { + clean_ids[0]: h.build_table_handle( + conn, table_a, clean_ids[0], connection_supplied=connection_supplied + ), + clean_ids[1]: h.build_table_handle( + conn, table_b, clean_ids[1], connection_supplied=connection_supplied + ), + } + h.validate_tables(conn, handles, clean_ids, by_columns, coerce=coerce) + + tables_frame = c.build_tables_frame(conn, handles, clean_ids, materialize_summary) + by_frame = c.build_by_frame( + conn, by_columns, handles, clean_ids, materialize_summary + ) + common_all = [ + col + for col in handles[clean_ids[0]].columns + if col in handles[clean_ids[1]].columns + ] + value_columns = [col for col in common_all if col not in by_columns] + unmatched_cols = c.build_unmatched_cols( + conn, handles, clean_ids, materialize_summary + ) + diff_table = None + if materialize_keys: + diff_table = c.compute_diff_table( + conn, + handles, + clean_ids, + by_columns, + value_columns, + allow_both_na, + ) + intersection, diff_lookup = c.build_intersection_frame( + value_columns, + handles, + clean_ids, + by_columns, + allow_both_na, + diff_table, + conn, + materialize_summary, + ) + unmatched_keys = c.compute_unmatched_keys( + conn, handles, clean_ids, by_columns, materialize_keys + ) + unmatched_rows_rel, unmatched_lookup = c.compute_unmatched_rows_summary( + conn, unmatched_keys, clean_ids, materialize_summary + ) + + return Comparison( + connection=conn, + handles=handles, + table_id=clean_ids, + by_columns=by_columns, + allow_both_na=allow_both_na, + materialize_mode=materialize, + tables=tables_frame, + by=by_frame, + intersection=intersection, + unmatched_cols=unmatched_cols, + unmatched_keys=unmatched_keys, + unmatched_rows=unmatched_rows_rel, + common_columns=value_columns, + table_columns={ + identifier: handle.columns[:] for identifier, handle in handles.items() + }, + diff_table=diff_table, + diff_lookup=diff_lookup, + unmatched_lookup=unmatched_lookup, + ) diff --git a/python/versus/comparison/_core.py b/python/versus/comparison/comparison.py similarity index 80% rename from python/versus/comparison/_core.py rename to python/versus/comparison/comparison.py index d416193..ef31a89 100644 --- a/python/versus/comparison/_core.py +++ b/python/versus/comparison/comparison.py @@ -1,165 +1,12 @@ from __future__ import annotations -from typing import ( - TYPE_CHECKING, - Dict, - List, - Mapping, - Optional, - Sequence, - Tuple, - Union, -) - -try: - from typing import Literal -except ImportError: # pragma: no cover - Python < 3.8 - from typing_extensions import Literal +from typing import Dict, List, Mapping, Optional, Sequence, Tuple import duckdb -from . import _compute as c from . import _helpers as h from . import _slices, _value_diffs, _weave -if TYPE_CHECKING: # pragma: no cover - import pandas - import polars - - -def compare( - table_a: Union[duckdb.DuckDBPyRelation, "pandas.DataFrame", "polars.DataFrame"], - table_b: Union[duckdb.DuckDBPyRelation, "pandas.DataFrame", "polars.DataFrame"], - *, - by: Sequence[str], - allow_both_na: bool = True, - coerce: bool = True, - table_id: Tuple[str, str] = ("a", "b"), - connection: Optional[duckdb.DuckDBPyConnection] = None, - materialize: Literal["all", "summary", "none"] = "all", -) -> Comparison: - """Compare two DuckDB relations by key columns. - - Parameters - ---------- - table_a, table_b : DuckDBPyRelation, pandas.DataFrame, or polars.DataFrame - DuckDB relations or pandas/polars DataFrames to compare. - by : sequence of str - Column names that uniquely identify rows. - allow_both_na : bool, default True - Whether to treat NULL/NA values as equal when both sides are missing. - coerce : bool, default True - If True, allow DuckDB to coerce compatible types. If False, require - exact type matches for shared columns. - table_id : tuple[str, str], default ("a", "b") - Labels used in outputs for the two tables. - connection : duckdb.DuckDBPyConnection, optional - DuckDB connection used to register the inputs and run queries. - materialize : {"all", "summary", "none"}, default "all" - Controls which helper tables are materialized upfront. - - Returns - ------- - Comparison - Comparison object with summary relations and diff helpers. - - Examples - -------- - >>> from versus import compare, examples - >>> comparison = compare( - ... examples.example_cars_a(), - ... examples.example_cars_b(), - ... by=["car"], - ... ) - >>> comparison.summary() - ┌────────────────┬─────────┐ - │ difference │ found │ - │ varchar │ boolean │ - ├────────────────┼─────────┤ - │ value_diffs │ true │ - │ unmatched_cols │ true │ - │ unmatched_rows │ true │ - │ type_diffs │ false │ - └────────────────┴─────────┘ - """ - materialize_summary, materialize_keys = h.resolve_materialize(materialize) - - conn = h.resolve_connection(connection) - clean_ids = h.validate_table_id(table_id) - by_columns = h.normalize_column_list(by, "by", allow_empty=False) - connection_supplied = connection is not None - handles = { - clean_ids[0]: h.build_table_handle( - conn, table_a, clean_ids[0], connection_supplied=connection_supplied - ), - clean_ids[1]: h.build_table_handle( - conn, table_b, clean_ids[1], connection_supplied=connection_supplied - ), - } - h.validate_tables(conn, handles, clean_ids, by_columns, coerce=coerce) - - tables_frame = c.build_tables_frame(conn, handles, clean_ids, materialize_summary) - by_frame = c.build_by_frame( - conn, by_columns, handles, clean_ids, materialize_summary - ) - common_all = [ - col - for col in handles[clean_ids[0]].columns - if col in handles[clean_ids[1]].columns - ] - value_columns = [col for col in common_all if col not in by_columns] - unmatched_cols = c.build_unmatched_cols( - conn, handles, clean_ids, materialize_summary - ) - diff_table = None - if materialize_keys: - diff_table = c.compute_diff_table( - conn, - handles, - clean_ids, - by_columns, - value_columns, - allow_both_na, - ) - intersection, diff_lookup = c.build_intersection_frame( - value_columns, - handles, - clean_ids, - by_columns, - allow_both_na, - diff_table, - conn, - materialize_summary, - ) - unmatched_keys = c.compute_unmatched_keys( - conn, handles, clean_ids, by_columns, materialize_keys - ) - unmatched_rows_rel, unmatched_lookup = c.compute_unmatched_rows_summary( - conn, unmatched_keys, clean_ids, materialize_summary - ) - - return Comparison( - connection=conn, - handles=handles, - table_id=clean_ids, - by_columns=by_columns, - allow_both_na=allow_both_na, - materialize_mode=materialize, - tables=tables_frame, - by=by_frame, - intersection=intersection, - unmatched_cols=unmatched_cols, - unmatched_keys=unmatched_keys, - unmatched_rows=unmatched_rows_rel, - common_columns=value_columns, - table_columns={ - identifier: handle.columns[:] for identifier, handle in handles.items() - }, - diff_table=diff_table, - diff_lookup=diff_lookup, - unmatched_lookup=unmatched_lookup, - ) - class Comparison: """In-memory description of how two relations differ. From 13f0fee04bdc92e8ce96328a82e97436e9eb7a21 Mon Sep 17 00:00:00 2001 From: eutwt <11261404+eutwt@users.noreply.github.com> Date: Sun, 28 Dec 2025 16:21:25 -0500 Subject: [PATCH 2/3] Restructure comparison internals --- python/versus/comparison/_compute.py | 13 --- python/versus/comparison/_frames.py | 123 ++++++++++++----------- python/versus/comparison/_helpers.py | 108 -------------------- python/versus/comparison/_inputs.py | 14 +-- python/versus/comparison/_relations.py | 10 +- python/versus/comparison/_slices.py | 45 +++++---- python/versus/comparison/_summary.py | 12 +-- python/versus/comparison/_validation.py | 8 +- python/versus/comparison/_value_diffs.py | 81 +++++++-------- python/versus/comparison/_weave.py | 84 ++++++++-------- python/versus/comparison/api.py | 33 +++--- python/versus/comparison/comparison.py | 64 ++++++------ 12 files changed, 244 insertions(+), 351 deletions(-) delete mode 100644 python/versus/comparison/_compute.py delete mode 100644 python/versus/comparison/_helpers.py diff --git a/python/versus/comparison/_compute.py b/python/versus/comparison/_compute.py deleted file mode 100644 index 5d52ec8..0000000 --- a/python/versus/comparison/_compute.py +++ /dev/null @@ -1,13 +0,0 @@ -from . import _frames as _frames - -build_tables_frame = _frames.build_tables_frame -build_by_frame = _frames.build_by_frame -build_unmatched_cols = _frames.build_unmatched_cols -build_intersection_frame = _frames.build_intersection_frame -compute_diff_table = _frames.compute_diff_table -compute_unmatched_keys = _frames.compute_unmatched_keys -compute_unmatched_rows_summary = _frames.compute_unmatched_rows_summary - -_build_empty_intersection_relation = _frames._build_empty_intersection_relation -_build_intersection_frame_with_table = _frames._build_intersection_frame_with_table -_build_intersection_frame_inline = _frames._build_intersection_frame_inline diff --git a/python/versus/comparison/_frames.py b/python/versus/comparison/_frames.py index 7efb59c..8150b6f 100644 --- a/python/versus/comparison/_frames.py +++ b/python/versus/comparison/_frames.py @@ -4,18 +4,21 @@ import duckdb -from . import _helpers as h +from . import _relations as r +from . import _sql as q +from . import _summary as s +from . import _types as t def build_tables_frame( - conn: h.VersusConn, - handles: Mapping[str, h._TableHandle], + conn: t.VersusConn, + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], materialize: bool, ) -> duckdb.DuckDBPyRelation: def row_for(identifier: str) -> Tuple[str, int, int]: handle = handles[identifier] - return identifier, h.table_count(handle), len(handle.columns) + return identifier, r.table_count(handle), len(handle.columns) rows = [row_for(identifier) for identifier in table_id] schema = [ @@ -23,13 +26,13 @@ def row_for(identifier: str) -> Tuple[str, int, int]: ("nrow", "BIGINT"), ("ncol", "BIGINT"), ] - return h.build_rows_relation(conn, rows, schema, materialize) + return s.build_rows_relation(conn, rows, schema, materialize) def build_by_frame( - conn: h.VersusConn, + conn: t.VersusConn, by_columns: List[str], - handles: Mapping[str, h._TableHandle], + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], materialize: bool, ) -> duckdb.DuckDBPyRelation: @@ -47,12 +50,12 @@ def build_by_frame( (f"type_{first}", "VARCHAR"), (f"type_{second}", "VARCHAR"), ] - return h.build_rows_relation(conn, rows, schema, materialize) + return s.build_rows_relation(conn, rows, schema, materialize) def build_unmatched_cols( - conn: h.VersusConn, - handles: Mapping[str, h._TableHandle], + conn: t.VersusConn, + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], materialize: bool, ) -> duckdb.DuckDBPyRelation: @@ -71,17 +74,17 @@ def build_unmatched_cols( ("column", "VARCHAR"), ("type", "VARCHAR"), ] - return h.build_rows_relation(conn, rows, schema, materialize) + return s.build_rows_relation(conn, rows, schema, materialize) def build_intersection_frame( value_columns: List[str], - handles: Mapping[str, h._TableHandle], + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], by_columns: List[str], allow_both_na: bool, diff_table: Optional[duckdb.DuckDBPyRelation], - conn: h.VersusConn, + conn: t.VersusConn, materialize: bool, ) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: if diff_table is None: @@ -100,7 +103,7 @@ def build_intersection_frame( def _build_empty_intersection_relation( - conn: h.VersusConn, + conn: t.VersusConn, table_id: Tuple[str, str], materialize: bool, ) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: @@ -111,16 +114,16 @@ def _build_empty_intersection_relation( (f"type_{first}", "VARCHAR"), (f"type_{second}", "VARCHAR"), ] - relation = h.build_rows_relation(conn, [], schema, materialize) + relation = s.build_rows_relation(conn, [], schema, materialize) return relation, {} if materialize else None def _build_intersection_frame_with_table( value_columns: List[str], - handles: Mapping[str, h._TableHandle], + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], diff_table: duckdb.DuckDBPyRelation, - conn: h.VersusConn, + conn: t.VersusConn, materialize: bool, ) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: first, second = table_id @@ -131,18 +134,18 @@ def diff_alias(column: str) -> str: return f"n_diffs_{column}" count_columns = ",\n ".join( - f"COUNT(*) FILTER (WHERE diffs.{h.ident(column)}) " - f"AS {h.ident(diff_alias(column))}" + f"COUNT(*) FILTER (WHERE diffs.{q.ident(column)}) " + f"AS {q.ident(diff_alias(column))}" for column in value_columns ) def select_for(column: str) -> str: return f""" SELECT - {h.sql_literal(column)} AS {h.ident("column")}, - counts.{h.ident(diff_alias(column))} AS {h.ident("n_diffs")}, - {h.sql_literal(handles[first].types[column])} AS {h.ident(f"type_{first}")}, - {h.sql_literal(handles[second].types[column])} AS {h.ident(f"type_{second}")} + {q.sql_literal(column)} AS {q.ident("column")}, + counts.{q.ident(diff_alias(column))} AS {q.ident("n_diffs")}, + {q.sql_literal(handles[first].types[column])} AS {q.ident(f"type_{first}")}, + {q.sql_literal(handles[second].types[column])} AS {q.ident(f"type_{second}")} FROM counts """ @@ -157,42 +160,42 @@ def select_for(column: str) -> str: ) {" UNION ALL ".join(select_for(column) for column in value_columns)} """ - relation = h.finalize_relation(conn, sql, materialize) + relation = s.finalize_relation(conn, sql, materialize) if not materialize: return relation, None - return relation, h.diff_lookup_from_intersection(relation) + return relation, r.diff_lookup_from_intersection(relation) def _build_intersection_frame_inline( value_columns: List[str], - handles: Mapping[str, h._TableHandle], + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], by_columns: List[str], allow_both_na: bool, - conn: h.VersusConn, + conn: t.VersusConn, materialize: bool, ) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: if not value_columns: return _build_empty_intersection_relation(conn, table_id, materialize) first, second = table_id - join_sql = h.inputs_join_sql(handles, table_id, by_columns) + join_sql = q.inputs_join_sql(handles, table_id, by_columns) def diff_alias(column: str) -> str: return f"n_diffs_{column}" count_columns = ",\n ".join( - f"COUNT(*) FILTER (WHERE {h.diff_predicate(column, allow_both_na, 'a', 'b')}) " - f"AS {h.ident(diff_alias(column))}" + f"COUNT(*) FILTER (WHERE {q.diff_predicate(column, allow_both_na, 'a', 'b')}) " + f"AS {q.ident(diff_alias(column))}" for column in value_columns ) def select_for(column: str) -> str: return f""" SELECT - {h.sql_literal(column)} AS {h.ident("column")}, - counts.{h.ident(diff_alias(column))} AS {h.ident("n_diffs")}, - {h.sql_literal(handles[first].types[column])} AS {h.ident(f"type_{first}")}, - {h.sql_literal(handles[second].types[column])} AS {h.ident(f"type_{second}")} + {q.sql_literal(column)} AS {q.ident("column")}, + counts.{q.ident(diff_alias(column))} AS {q.ident("n_diffs")}, + {q.sql_literal(handles[first].types[column])} AS {q.ident(f"type_{first}")}, + {q.sql_literal(handles[second].types[column])} AS {q.ident(f"type_{second}")} FROM counts """ @@ -206,15 +209,15 @@ def select_for(column: str) -> str: ) {" UNION ALL ".join(select_for(column) for column in value_columns)} """ - relation = h.finalize_relation(conn, sql, materialize) + relation = s.finalize_relation(conn, sql, materialize) if not materialize: return relation, None - return relation, h.diff_lookup_from_intersection(relation) + return relation, r.diff_lookup_from_intersection(relation) def compute_diff_table( - conn: h.VersusConn, - handles: Mapping[str, h._TableHandle], + conn: t.VersusConn, + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], by_columns: List[str], value_columns: List[str], @@ -222,15 +225,15 @@ def compute_diff_table( ) -> duckdb.DuckDBPyRelation: if not value_columns: schema = [(column, handles[table_id[0]].types[column]) for column in by_columns] - return h.build_rows_relation(conn, [], schema, materialize=True) - join_sql = h.inputs_join_sql(handles, table_id, by_columns) - select_by = h.select_cols(by_columns, alias="a") + return s.build_rows_relation(conn, [], schema, materialize=True) + join_sql = q.inputs_join_sql(handles, table_id, by_columns) + select_by = q.select_cols(by_columns, alias="a") diff_expressions = [ - (column, h.diff_predicate(column, allow_both_na, "a", "b")) + (column, q.diff_predicate(column, allow_both_na, "a", "b")) for column in value_columns ] diff_flags = ",\n ".join( - f"{expression} AS {h.ident(column)}" for column, expression in diff_expressions + f"{expression} AS {q.ident(column)}" for column, expression in diff_expressions ) predicate = " OR ".join(expression for _, expression in diff_expressions) sql = f""" @@ -242,12 +245,12 @@ def compute_diff_table( WHERE {predicate} """ - return h.finalize_relation(conn, sql, materialize=True) + return s.finalize_relation(conn, sql, materialize=True) def compute_unmatched_keys( - conn: h.VersusConn, - handles: Mapping[str, h._TableHandle], + conn: t.VersusConn, + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], by_columns: List[str], materialize: bool, @@ -256,33 +259,33 @@ def key_part(identifier: str) -> str: other = table_id[1] if identifier == table_id[0] else table_id[0] handle_left = handles[identifier] handle_right = handles[other] - select_by = h.select_cols(by_columns, alias="left_tbl") - condition = h.join_condition(by_columns, "left_tbl", "right_tbl") + select_by = q.select_cols(by_columns, alias="left_tbl") + condition = q.join_condition(by_columns, "left_tbl", "right_tbl") return f""" SELECT - {h.sql_literal(identifier)} AS table_name, + {q.sql_literal(identifier)} AS table_name, {select_by} FROM - {h.table_ref(handle_left)} AS left_tbl - ANTI JOIN {h.table_ref(handle_right)} AS right_tbl + {q.table_ref(handle_left)} AS left_tbl + ANTI JOIN {q.table_ref(handle_right)} AS right_tbl ON {condition} """ keys_parts = [key_part(identifier) for identifier in table_id] unmatched_keys_sql = " UNION ALL ".join(keys_parts) - return h.finalize_relation(conn, unmatched_keys_sql, materialize) + return s.finalize_relation(conn, unmatched_keys_sql, materialize) def compute_unmatched_rows_summary( - conn: h.VersusConn, + conn: t.VersusConn, unmatched_keys: duckdb.DuckDBPyRelation, table_id: Tuple[str, str], materialize: bool, ) -> Tuple[duckdb.DuckDBPyRelation, Optional[Dict[str, int]]]: unmatched_keys_sql = unmatched_keys.sql_query() - table_col = h.ident("table_name") - count_col = h.ident("n_unmatched") - base_sql = h.rows_relation_sql( + table_col = q.ident("table_name") + count_col = q.ident("n_unmatched") + base_sql = s.rows_relation_sql( [(table_id[0],), (table_id[1],)], [("table_name", "VARCHAR")] ) counts_sql = f""" @@ -296,8 +299,8 @@ def compute_unmatched_rows_summary( """ order_case = ( f"CASE base.{table_col} " - f"WHEN {h.sql_literal(table_id[0])} THEN 0 " - f"WHEN {h.sql_literal(table_id[1])} THEN 1 " + f"WHEN {q.sql_literal(table_id[0])} THEN 0 " + f"WHEN {q.sql_literal(table_id[1])} THEN 1 " "ELSE 2 END" ) sql = f""" @@ -311,7 +314,7 @@ def compute_unmatched_rows_summary( ORDER BY {order_case} """ - relation = h.finalize_relation(conn, sql, materialize) + relation = s.finalize_relation(conn, sql, materialize) if not materialize: return relation, None - return relation, h.unmatched_lookup_from_rows(relation) + return relation, r.unmatched_lookup_from_rows(relation) diff --git a/python/versus/comparison/_helpers.py b/python/versus/comparison/_helpers.py deleted file mode 100644 index 54e319c..0000000 --- a/python/versus/comparison/_helpers.py +++ /dev/null @@ -1,108 +0,0 @@ -from ._exceptions import ComparisonError -from ._inputs import ( - assert_relation_connection, - build_table_handle, - build_table_handle_from_frame, - build_table_handle_from_relation, - describe_source, - raise_relation_connection_error, - resolve_row_count, - row_count_from_frame, - source_ref_for_sql, -) -from ._relations import ( - diff_lookup_from_intersection, - relation_is_empty, - table_count, - unmatched_lookup_from_rows, -) -from ._sql import ( - col, - collect_diff_keys, - diff_predicate, - fetch_rows_by_keys, - ident, - inputs_join_sql, - join_condition, - require_diff_table, - run_sql, - select_cols, - select_zero_from_table, - sql_literal, - table_ref, -) -from ._summary import ( - SummaryRelation, - build_rows_relation, - finalize_relation, - materialize_temp_table, - rows_relation_sql, -) -from ._types import _Input, _TableHandle, VersusConn, VersusState -from ._validation import ( - assert_column_allowed, - assert_unique_by, - normalize_column_list, - normalize_single_column, - normalize_table_arg, - resolve_column_list, - resolve_connection, - resolve_materialize, - validate_columns, - validate_columns_exist, - validate_table_id, - validate_tables, - validate_type_compatibility, -) - -__all__ = [ - "ComparisonError", - "_Input", - "_TableHandle", - "VersusConn", - "VersusState", - "SummaryRelation", - "resolve_materialize", - "resolve_connection", - "validate_columns_exist", - "validate_type_compatibility", - "validate_columns", - "validate_tables", - "assert_unique_by", - "validate_table_id", - "normalize_column_list", - "normalize_table_arg", - "normalize_single_column", - "resolve_column_list", - "assert_column_allowed", - "build_table_handle", - "build_table_handle_from_relation", - "build_table_handle_from_frame", - "describe_source", - "source_ref_for_sql", - "resolve_row_count", - "row_count_from_frame", - "raise_relation_connection_error", - "assert_relation_connection", - "ident", - "col", - "table_ref", - "select_cols", - "join_condition", - "inputs_join_sql", - "diff_predicate", - "sql_literal", - "run_sql", - "require_diff_table", - "collect_diff_keys", - "fetch_rows_by_keys", - "select_zero_from_table", - "relation_is_empty", - "diff_lookup_from_intersection", - "unmatched_lookup_from_rows", - "rows_relation_sql", - "materialize_temp_table", - "finalize_relation", - "build_rows_relation", - "table_count", -] diff --git a/python/versus/comparison/_inputs.py b/python/versus/comparison/_inputs.py index 300ad1e..ac173e4 100644 --- a/python/versus/comparison/_inputs.py +++ b/python/versus/comparison/_inputs.py @@ -6,9 +6,9 @@ import duckdb from ._exceptions import ComparisonError -from ._sql import ident, run_sql +from . import _sql as q +from . import _validation as v from ._types import _Input, _TableHandle, VersusConn -from ._validation import validate_columns if TYPE_CHECKING: # pragma: no cover import pandas @@ -47,7 +47,7 @@ def build_table_handle_from_relation( name: str, connection_supplied: bool, ) -> _TableHandle: - validate_columns(source.columns, label) + v.validate_columns(source.columns, label) source_sql = source.sql_query() display = getattr(source, "alias", "relation") assert_relation_connection(conn, source, label, connection_supplied) @@ -78,7 +78,7 @@ def build_table_handle_from_frame( ) -> _TableHandle: source_columns = getattr(source, "columns", None) if source_columns is not None: - validate_columns(list(source_columns), label) + v.validate_columns(list(source_columns), label) try: conn.register(name, source) except Exception as exc: @@ -109,7 +109,7 @@ def describe_source( is_identifier: bool, ) -> Tuple[List[str], Dict[str, str]]: source_ref = source_ref_for_sql(source_sql, is_identifier) - rel = run_sql(conn, f"DESCRIBE SELECT * FROM {source_ref}") + rel = q.run_sql(conn, f"DESCRIBE SELECT * FROM {source_ref}") rows = rel.fetchall() columns = [row[0] for row in rows] types = {row[0]: row[1] for row in rows} @@ -117,7 +117,7 @@ def describe_source( def source_ref_for_sql(source_sql: str, is_identifier: bool) -> str: - return ident(source_sql) if is_identifier else f"({source_sql})" + return q.ident(source_sql) if is_identifier else f"({source_sql})" def resolve_row_count( @@ -131,7 +131,7 @@ def resolve_row_count( if frame_row_count is not None: return frame_row_count source_ref = source_ref_for_sql(source_sql, is_identifier) - row = run_sql(conn, f"SELECT COUNT(*) FROM {source_ref}").fetchone() + row = q.run_sql(conn, f"SELECT COUNT(*) FROM {source_ref}").fetchone() assert row is not None and isinstance(row[0], int) return row[0] diff --git a/python/versus/comparison/_relations.py b/python/versus/comparison/_relations.py index 5bfc3f6..046f1c1 100644 --- a/python/versus/comparison/_relations.py +++ b/python/versus/comparison/_relations.py @@ -4,12 +4,12 @@ import duckdb -from ._summary import SummaryRelation -from ._types import _TableHandle +from . import _summary as s +from . import _types as t def relation_is_empty( - relation: Union[duckdb.DuckDBPyRelation, SummaryRelation], + relation: Union[duckdb.DuckDBPyRelation, s.SummaryRelation], ) -> bool: return relation.limit(1).fetchone() is None @@ -28,8 +28,8 @@ def unmatched_lookup_from_rows( return {row[0]: int(row[1]) for row in rows} -def table_count(relation: Union[duckdb.DuckDBPyRelation, _TableHandle]) -> int: - if isinstance(relation, _TableHandle): +def table_count(relation: Union[duckdb.DuckDBPyRelation, t._TableHandle]) -> int: + if isinstance(relation, t._TableHandle): return relation.row_count row = relation.count("*").fetchall()[0] assert isinstance(row[0], int) diff --git a/python/versus/comparison/_slices.py b/python/versus/comparison/_slices.py index b4db32a..53a9549 100644 --- a/python/versus/comparison/_slices.py +++ b/python/versus/comparison/_slices.py @@ -4,7 +4,8 @@ import duckdb -from . import _helpers as h +from . import _sql as q +from . import _validation as v if TYPE_CHECKING: # pragma: no cover from .comparison import Comparison @@ -15,11 +16,11 @@ def slice_diffs( table: str, columns: Optional[Sequence[str]] = None, ) -> duckdb.DuckDBPyRelation: - table_name = h.normalize_table_arg(comparison, table) - selected = h.resolve_column_list(comparison, columns) + table_name = v.normalize_table_arg(comparison, table) + selected = v.resolve_column_list(comparison, columns) diff_cols = comparison._filter_diff_columns(selected) if not diff_cols: - return h.select_zero_from_table(comparison, table_name) + return q.select_zero_from_table(comparison, table_name) if comparison._materialize_mode == "all": relation = _slice_diffs_with_keys(comparison, table_name, diff_cols) else: @@ -30,8 +31,8 @@ def slice_diffs( def _slice_diffs_with_keys( comparison: "Comparison", table_name: str, diff_cols: Sequence[str] ) -> duckdb.DuckDBPyRelation: - key_sql = h.collect_diff_keys(comparison, diff_cols) - return h.fetch_rows_by_keys(comparison, table_name, key_sql) + key_sql = q.collect_diff_keys(comparison, diff_cols) + return q.fetch_rows_by_keys(comparison, table_name, key_sql) def _slice_diffs_inline( @@ -39,11 +40,11 @@ def _slice_diffs_inline( ) -> duckdb.DuckDBPyRelation: table_a, table_b = comparison.table_id base_alias = "a" if table_name == table_a else "b" - join_sql = h.inputs_join_sql( + join_sql = q.inputs_join_sql( comparison._handles, comparison.table_id, comparison.by_columns ) predicate = " OR ".join( - h.diff_predicate(col, comparison.allow_both_na, "a", "b") for col in diff_cols + q.diff_predicate(col, comparison.allow_both_na, "a", "b") for col in diff_cols ) sql = f""" SELECT @@ -53,29 +54,29 @@ def _slice_diffs_inline( WHERE {predicate} """ - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) def build_unmatched_keys_sql(comparison: "Comparison", table_name: str) -> str: unmatched_keys_sql = comparison.unmatched_keys.sql_query() - by_cols = h.select_cols(comparison.by_columns, alias="keys") - table_filter = f"keys.{h.ident('table_name')} = {h.sql_literal(table_name)}" + by_cols = q.select_cols(comparison.by_columns, alias="keys") + table_filter = f"keys.{q.ident('table_name')} = {q.sql_literal(table_name)}" return f"SELECT {by_cols} FROM ({unmatched_keys_sql}) AS keys WHERE {table_filter}" def slice_unmatched(comparison: "Comparison", table: str) -> duckdb.DuckDBPyRelation: - table_name = h.normalize_table_arg(comparison, table) + table_name = v.normalize_table_arg(comparison, table) unmatched_lookup = comparison._unmatched_lookup if unmatched_lookup is not None and unmatched_lookup[table_name] == 0: - return h.select_zero_from_table(comparison, table_name) + return q.select_zero_from_table(comparison, table_name) key_sql = build_unmatched_keys_sql(comparison, table_name) - return h.fetch_rows_by_keys(comparison, table_name, key_sql) + return q.fetch_rows_by_keys(comparison, table_name, key_sql) def slice_unmatched_both(comparison: "Comparison") -> duckdb.DuckDBPyRelation: out_cols = comparison.by_columns + comparison.common_columns - select_cols = h.select_cols(out_cols, alias="base") - join_condition = h.join_condition(comparison.by_columns, "keys", "base") + select_cols = q.select_cols(out_cols, alias="base") + join_condition = q.join_condition(comparison.by_columns, "keys", "base") unmatched_lookup = comparison._unmatched_lookup table_names = [ table_name @@ -88,25 +89,25 @@ def select_for(table_name: str) -> str: base_table = comparison._handles[table_name] return f""" SELECT - {h.sql_literal(table_name)} AS table_name, + {q.sql_literal(table_name)} AS table_name, {select_cols} FROM - {h.table_ref(base_table)} AS base + {q.table_ref(base_table)} AS base JOIN ({unmatched_keys_sql}) AS keys ON {join_condition} """ selects = [select_for(table_name) for table_name in table_names] if not selects: - base = h.select_zero_from_table(comparison, comparison.table_id[0], out_cols) + base = q.select_zero_from_table(comparison, comparison.table_id[0], out_cols) sql = f""" SELECT - {h.sql_literal(comparison.table_id[0])} AS table_name, - {h.select_cols(out_cols)} + {q.sql_literal(comparison.table_id[0])} AS table_name, + {q.select_cols(out_cols)} FROM base """ relation = base.query("base", sql) return relation sql = " UNION ALL ".join(selects) - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) diff --git a/python/versus/comparison/_summary.py b/python/versus/comparison/_summary.py index 42c1892..0f30d1d 100644 --- a/python/versus/comparison/_summary.py +++ b/python/versus/comparison/_summary.py @@ -5,7 +5,7 @@ import duckdb -from ._sql import ident, sql_literal +from . import _sql as q from ._types import VersusConn @@ -55,15 +55,15 @@ def rows_relation_sql( ) -> str: if not rows: select_list = ", ".join( - f"CAST(NULL AS {dtype}) AS {ident(name)}" for name, dtype in schema + f"CAST(NULL AS {dtype}) AS {q.ident(name)}" for name, dtype in schema ) return f"SELECT {select_list} LIMIT 0" value_rows = [ - "(" + ", ".join(sql_literal(value) for value in row) + ")" for row in rows + "(" + ", ".join(q.sql_literal(value) for value in row) + ")" for row in rows ] alias_cols = ", ".join(f"col{i}" for i in range(len(schema))) select_list = ", ".join( - f"CAST(col{i} AS {dtype}) AS {ident(name)}" + f"CAST(col{i} AS {dtype}) AS {q.ident(name)}" for i, (name, dtype) in enumerate(schema) ) return ( @@ -73,7 +73,7 @@ def rows_relation_sql( def materialize_temp_table(conn: VersusConn, sql: str) -> str: name = f"__versus_table_{uuid.uuid4().hex}" - conn.execute(f"CREATE OR REPLACE TEMP TABLE {ident(name)} AS {sql}") + conn.execute(f"CREATE OR REPLACE TEMP TABLE {q.ident(name)} AS {sql}") return name @@ -86,7 +86,7 @@ def finalize_relation( return conn.sql(sql) table = materialize_temp_table(conn, sql) conn.versus.temp_tables.append(table) - return conn.sql(f"SELECT * FROM {ident(table)}") + return conn.sql(f"SELECT * FROM {q.ident(table)}") def build_rows_relation( diff --git a/python/versus/comparison/_validation.py b/python/versus/comparison/_validation.py index ffb7dec..df2450d 100644 --- a/python/versus/comparison/_validation.py +++ b/python/versus/comparison/_validation.py @@ -6,7 +6,7 @@ import duckdb from ._exceptions import ComparisonError -from ._sql import run_sql, select_cols, table_ref +from . import _sql as q from ._types import _TableHandle, VersusConn if TYPE_CHECKING: # pragma: no cover @@ -102,13 +102,13 @@ def assert_unique_by( by_columns: List[str], identifier: str, ) -> None: - cols = select_cols(by_columns, alias="t") + cols = q.select_cols(by_columns, alias="t") sql = f""" SELECT {cols}, COUNT(*) AS n FROM - {table_ref(handle)} AS t + {q.table_ref(handle)} AS t GROUP BY {cols} HAVING @@ -116,7 +116,7 @@ def assert_unique_by( LIMIT 1 """ - rel = run_sql(conn, sql) + rel = q.run_sql(conn, sql) rows = rel.fetchall() if rows: first = rows[0] diff --git a/python/versus/comparison/_value_diffs.py b/python/versus/comparison/_value_diffs.py index 9cbd1fc..e4edbae 100644 --- a/python/versus/comparison/_value_diffs.py +++ b/python/versus/comparison/_value_diffs.py @@ -4,15 +4,16 @@ import duckdb -from . import _helpers as h +from . import _sql as q +from . import _validation as v if TYPE_CHECKING: # pragma: no cover from .comparison import Comparison def value_diffs(comparison: "Comparison", column: str) -> duckdb.DuckDBPyRelation: - target_col = h.normalize_single_column(column) - h.assert_column_allowed(comparison, target_col, "value_diffs") + target_col = v.normalize_single_column(column) + v.assert_column_allowed(comparison, target_col, "value_diffs") if comparison._materialize_mode == "all": relation = _value_diffs_with_diff_table(comparison, target_col) else: @@ -23,7 +24,7 @@ def value_diffs(comparison: "Comparison", column: str) -> duckdb.DuckDBPyRelatio def value_diffs_stacked( comparison: "Comparison", columns: Optional[Sequence[str]] = None ) -> duckdb.DuckDBPyRelation: - selected = h.resolve_column_list(comparison, columns, allow_empty=False) + selected = v.resolve_column_list(comparison, columns, allow_empty=False) diff_cols = comparison._filter_diff_columns(selected) if not diff_cols: return _empty_value_diffs_stacked(comparison, selected) @@ -31,7 +32,7 @@ def value_diffs_stacked( def stack_fn(column: str) -> str: return stack_value_diffs_sql( - comparison, column, h.collect_diff_keys(comparison, [column]) + comparison, column, q.collect_diff_keys(comparison, [column]) ) else: @@ -40,32 +41,32 @@ def stack_fn(column: str) -> str: selects = [stack_fn(column) for column in diff_cols] sql = " UNION ALL ".join(selects) - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) def _value_diffs_with_diff_table( comparison: "Comparison", target_col: str ) -> duckdb.DuckDBPyRelation: - key_sql = h.collect_diff_keys(comparison, [target_col]) + key_sql = q.collect_diff_keys(comparison, [target_col]) table_a, table_b = comparison.table_id select_cols = [ - f"{h.col('a', target_col)} AS {h.ident(f'{target_col}_{table_a}')}", - f"{h.col('b', target_col)} AS {h.ident(f'{target_col}_{table_b}')}", - h.select_cols(comparison.by_columns, alias="keys"), + f"{q.col('a', target_col)} AS {q.ident(f'{target_col}_{table_a}')}", + f"{q.col('b', target_col)} AS {q.ident(f'{target_col}_{table_b}')}", + q.select_cols(comparison.by_columns, alias="keys"), ] - join_a = h.join_condition(comparison.by_columns, "keys", "a") - join_b = h.join_condition(comparison.by_columns, "keys", "b") + join_a = q.join_condition(comparison.by_columns, "keys", "a") + join_b = q.join_condition(comparison.by_columns, "keys", "b") sql = f""" SELECT {", ".join(select_cols)} FROM ({key_sql}) AS keys - JOIN {h.table_ref(comparison._handles[table_a])} AS a + JOIN {q.table_ref(comparison._handles[table_a])} AS a ON {join_a} - JOIN {h.table_ref(comparison._handles[table_b])} AS b + JOIN {q.table_ref(comparison._handles[table_b])} AS b ON {join_b} """ - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) def _value_diffs_inline( @@ -73,14 +74,14 @@ def _value_diffs_inline( ) -> duckdb.DuckDBPyRelation: table_a, table_b = comparison.table_id select_cols = [ - f"{h.col('a', target_col)} AS {h.ident(f'{target_col}_{table_a}')}", - f"{h.col('b', target_col)} AS {h.ident(f'{target_col}_{table_b}')}", - h.select_cols(comparison.by_columns, alias="a"), + f"{q.col('a', target_col)} AS {q.ident(f'{target_col}_{table_a}')}", + f"{q.col('b', target_col)} AS {q.ident(f'{target_col}_{table_b}')}", + q.select_cols(comparison.by_columns, alias="a"), ] - join_sql = h.inputs_join_sql( + join_sql = q.inputs_join_sql( comparison._handles, comparison.table_id, comparison.by_columns ) - predicate = h.diff_predicate(target_col, comparison.allow_both_na, "a", "b") + predicate = q.diff_predicate(target_col, comparison.allow_both_na, "a", "b") sql = f""" SELECT {", ".join(select_cols)} @@ -89,7 +90,7 @@ def _value_diffs_inline( WHERE {predicate} """ - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) def stack_value_diffs_sql( @@ -100,21 +101,21 @@ def stack_value_diffs_sql( table_a, table_b = comparison.table_id by_columns = comparison.by_columns select_parts = [ - f"{h.sql_literal(column)} AS {h.ident('column')}", - f"{h.col('a', column)} AS {h.ident(f'val_{table_a}')}", - f"{h.col('b', column)} AS {h.ident(f'val_{table_b}')}", - h.select_cols(by_columns, alias="keys"), + f"{q.sql_literal(column)} AS {q.ident('column')}", + f"{q.col('a', column)} AS {q.ident(f'val_{table_a}')}", + f"{q.col('b', column)} AS {q.ident(f'val_{table_b}')}", + q.select_cols(by_columns, alias="keys"), ] - join_a = h.join_condition(by_columns, "keys", "a") - join_b = h.join_condition(by_columns, "keys", "b") + join_a = q.join_condition(by_columns, "keys", "a") + join_b = q.join_condition(by_columns, "keys", "b") return f""" SELECT {", ".join(select_parts)} FROM ({key_sql}) AS keys - JOIN {h.table_ref(comparison._handles[table_a])} AS a + JOIN {q.table_ref(comparison._handles[table_a])} AS a ON {join_a} - JOIN {h.table_ref(comparison._handles[table_b])} AS b + JOIN {q.table_ref(comparison._handles[table_b])} AS b ON {join_b} """ @@ -122,15 +123,15 @@ def stack_value_diffs_sql( def stack_value_diffs_inline_sql(comparison: "Comparison", column: str) -> str: table_a, table_b = comparison.table_id select_parts = [ - f"{h.sql_literal(column)} AS {h.ident('column')}", - f"{h.col('a', column)} AS {h.ident(f'val_{table_a}')}", - f"{h.col('b', column)} AS {h.ident(f'val_{table_b}')}", - h.select_cols(comparison.by_columns, alias="a"), + f"{q.sql_literal(column)} AS {q.ident('column')}", + f"{q.col('a', column)} AS {q.ident(f'val_{table_a}')}", + f"{q.col('b', column)} AS {q.ident(f'val_{table_b}')}", + q.select_cols(comparison.by_columns, alias="a"), ] - join_sql = h.inputs_join_sql( + join_sql = q.inputs_join_sql( comparison._handles, comparison.table_id, comparison.by_columns ) - predicate = h.diff_predicate(column, comparison.allow_both_na, "a", "b") + predicate = q.diff_predicate(column, comparison.allow_both_na, "a", "b") return f""" SELECT {", ".join(select_parts)} @@ -153,16 +154,16 @@ def select_for(column: str) -> str: type_a = handle_a.types[column] type_b = handle_b.types[column] by_parts = [ - f"CAST(NULL AS {handle_a.types[by_col]}) AS {h.ident(by_col)}" + f"CAST(NULL AS {handle_a.types[by_col]}) AS {q.ident(by_col)}" for by_col in by_columns ] select_parts = [ - f"{h.sql_literal(column)} AS {h.ident('column')}", - f"CAST(NULL AS {type_a}) AS {h.ident(f'val_{table_a}')}", - f"CAST(NULL AS {type_b}) AS {h.ident(f'val_{table_b}')}", + f"{q.sql_literal(column)} AS {q.ident('column')}", + f"CAST(NULL AS {type_a}) AS {q.ident(f'val_{table_a}')}", + f"CAST(NULL AS {type_b}) AS {q.ident(f'val_{table_b}')}", *by_parts, ] return f"SELECT {', '.join(select_parts)} LIMIT 0" sql = " UNION ALL ".join(select_for(column) for column in columns) - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) diff --git a/python/versus/comparison/_weave.py b/python/versus/comparison/_weave.py index 65d8529..3d1a805 100644 --- a/python/versus/comparison/_weave.py +++ b/python/versus/comparison/_weave.py @@ -5,7 +5,9 @@ import duckdb -from . import _helpers as h +from . import _exceptions as e +from . import _sql as q +from . import _validation as v if TYPE_CHECKING: # pragma: no cover from .comparison import Comparison @@ -16,12 +18,12 @@ def weave_diffs_wide( columns: Optional[Sequence[str]] = None, suffix: Optional[Tuple[str, str]] = None, ) -> duckdb.DuckDBPyRelation: - selected = h.resolve_column_list(comparison, columns) + selected = v.resolve_column_list(comparison, columns) diff_cols = comparison._filter_diff_columns(selected) table_a, table_b = comparison.table_id out_cols = comparison.by_columns + comparison.common_columns if not diff_cols: - return h.select_zero_from_table(comparison, table_a, out_cols) + return q.select_zero_from_table(comparison, table_a, out_cols) if comparison._materialize_mode == "all": relation = _weave_diffs_wide_with_keys(comparison, diff_cols, suffix) else: @@ -33,17 +35,17 @@ def weave_diffs_long( comparison: "Comparison", columns: Optional[Sequence[str]] = None, ) -> duckdb.DuckDBPyRelation: - selected = h.resolve_column_list(comparison, columns) + selected = v.resolve_column_list(comparison, columns) diff_cols = comparison._filter_diff_columns(selected) table_a, table_b = comparison.table_id out_cols = comparison.by_columns + comparison.common_columns if not diff_cols: - base = h.select_zero_from_table(comparison, table_a, out_cols) + base = q.select_zero_from_table(comparison, table_a, out_cols) relation = base.query( "base", ( - f"SELECT {h.sql_literal(table_a)} AS table_name, " - f"{h.select_cols(out_cols)} FROM base" + f"SELECT {q.sql_literal(table_a)} AS table_name, " + f"{q.select_cols(out_cols)} FROM base" ), ) return relation @@ -64,12 +66,12 @@ def _weave_select_parts( def parts_for(column: str) -> List[str]: if column in diff_set: return [ - f"{h.col('a', column)} AS {h.ident(f'{column}{suffix[0]}')}", - f"{h.col('b', column)} AS {h.ident(f'{column}{suffix[1]}')}", + f"{q.col('a', column)} AS {q.ident(f'{column}{suffix[0]}')}", + f"{q.col('b', column)} AS {q.ident(f'{column}{suffix[1]}')}", ] - return [h.col("a", column)] + return [q.col("a", column)] - by_parts = [h.col("a", column) for column in comparison.by_columns] + by_parts = [q.col("a", column) for column in comparison.by_columns] common_parts = list( chain.from_iterable(parts_for(column) for column in comparison.common_columns) ) @@ -83,21 +85,21 @@ def _weave_diffs_wide_with_keys( ) -> duckdb.DuckDBPyRelation: table_a, table_b = comparison.table_id suffix = resolve_suffix(suffix, comparison.table_id) - keys = h.collect_diff_keys(comparison, diff_cols) + keys = q.collect_diff_keys(comparison, diff_cols) select_parts = _weave_select_parts(comparison, diff_cols, suffix) - join_a = h.join_condition(comparison.by_columns, "keys", "a") - join_b = h.join_condition(comparison.by_columns, "keys", "b") + join_a = q.join_condition(comparison.by_columns, "keys", "a") + join_b = q.join_condition(comparison.by_columns, "keys", "b") sql = f""" SELECT {", ".join(select_parts)} FROM ({keys}) AS keys - JOIN {h.table_ref(comparison._handles[table_a])} AS a + JOIN {q.table_ref(comparison._handles[table_a])} AS a ON {join_a} - JOIN {h.table_ref(comparison._handles[table_b])} AS b + JOIN {q.table_ref(comparison._handles[table_b])} AS b ON {join_b} """ - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) def _weave_diffs_wide_inline( @@ -108,11 +110,11 @@ def _weave_diffs_wide_inline( table_a, table_b = comparison.table_id suffix = resolve_suffix(suffix, comparison.table_id) select_parts = _weave_select_parts(comparison, diff_cols, suffix) - join_sql = h.inputs_join_sql( + join_sql = q.inputs_join_sql( comparison._handles, comparison.table_id, comparison.by_columns ) predicate = " OR ".join( - h.diff_predicate(col, comparison.allow_both_na, "a", "b") for col in diff_cols + q.diff_predicate(col, comparison.allow_both_na, "a", "b") for col in diff_cols ) sql = f""" SELECT @@ -122,7 +124,7 @@ def _weave_diffs_wide_inline( WHERE {predicate} """ - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) def _weave_diffs_long_with_keys( @@ -130,13 +132,13 @@ def _weave_diffs_long_with_keys( ) -> duckdb.DuckDBPyRelation: table_a, table_b = comparison.table_id out_cols = comparison.by_columns + comparison.common_columns - keys = h.collect_diff_keys(comparison, diff_cols) - table_column = h.ident("table_name") - select_cols_a = h.select_cols(out_cols, alias="a") - select_cols_b = h.select_cols(out_cols, alias="b") - join_a = h.join_condition(comparison.by_columns, "keys", "a") - join_b = h.join_condition(comparison.by_columns, "keys", "b") - order_cols = h.select_cols(comparison.by_columns) + keys = q.collect_diff_keys(comparison, diff_cols) + table_column = q.ident("table_name") + select_cols_a = q.select_cols(out_cols, alias="a") + select_cols_b = q.select_cols(out_cols, alias="b") + join_a = q.join_condition(comparison.by_columns, "keys", "a") + join_b = q.join_condition(comparison.by_columns, "keys", "b") + order_cols = q.select_cols(comparison.by_columns) sql = f""" WITH keys AS ( @@ -144,7 +146,7 @@ def _weave_diffs_long_with_keys( ) SELECT {table_column}, - {h.select_cols(out_cols)} + {q.select_cols(out_cols)} FROM ( SELECT @@ -153,7 +155,7 @@ def _weave_diffs_long_with_keys( {select_cols_a} FROM keys - JOIN {h.table_ref(comparison._handles[table_a])} AS a + JOIN {q.table_ref(comparison._handles[table_a])} AS a ON {join_a} UNION ALL SELECT @@ -162,14 +164,14 @@ def _weave_diffs_long_with_keys( {select_cols_b} FROM keys - JOIN {h.table_ref(comparison._handles[table_b])} AS b + JOIN {q.table_ref(comparison._handles[table_b])} AS b ON {join_b} ) AS stacked ORDER BY {order_cols}, __table_order """ - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) def _weave_diffs_long_inline( @@ -177,20 +179,20 @@ def _weave_diffs_long_inline( ) -> duckdb.DuckDBPyRelation: table_a, table_b = comparison.table_id out_cols = comparison.by_columns + comparison.common_columns - table_column = h.ident("table_name") - select_cols_a = h.select_cols(out_cols, alias="a") - select_cols_b = h.select_cols(out_cols, alias="b") - join_sql = h.inputs_join_sql( + table_column = q.ident("table_name") + select_cols_a = q.select_cols(out_cols, alias="a") + select_cols_b = q.select_cols(out_cols, alias="b") + join_sql = q.inputs_join_sql( comparison._handles, comparison.table_id, comparison.by_columns ) predicate = " OR ".join( - h.diff_predicate(col, comparison.allow_both_na, "a", "b") for col in diff_cols + q.diff_predicate(col, comparison.allow_both_na, "a", "b") for col in diff_cols ) - order_cols = h.select_cols(comparison.by_columns) + order_cols = q.select_cols(comparison.by_columns) sql = f""" SELECT {table_column}, - {h.select_cols(out_cols)} + {q.select_cols(out_cols)} FROM ( SELECT @@ -215,7 +217,7 @@ def _weave_diffs_long_inline( {order_cols}, __table_order """ - return h.run_sql(comparison.connection, sql) + return q.run_sql(comparison.connection, sql) # ------- helpers @@ -229,7 +231,7 @@ def resolve_suffix( or len(suffix) != 2 or not all(isinstance(item, str) for item in suffix) ): - raise h.ComparisonError("`suffix` must be a tuple of two strings or None") + raise e.ComparisonError("`suffix` must be a tuple of two strings or None") if suffix[0] == suffix[1]: - raise h.ComparisonError("Entries of `suffix` must be distinct") + raise e.ComparisonError("Entries of `suffix` must be distinct") return (suffix[0], suffix[1]) diff --git a/python/versus/comparison/api.py b/python/versus/comparison/api.py index 19837f2..0745005 100644 --- a/python/versus/comparison/api.py +++ b/python/versus/comparison/api.py @@ -18,8 +18,9 @@ import duckdb -from . import _frames as c -from . import _helpers as h +from . import _frames as f +from . import _inputs as i +from . import _validation as v from .comparison import Comparison if TYPE_CHECKING: # pragma: no cover @@ -82,24 +83,24 @@ def compare( │ type_diffs │ false │ └────────────────┴─────────┘ """ - materialize_summary, materialize_keys = h.resolve_materialize(materialize) + materialize_summary, materialize_keys = v.resolve_materialize(materialize) - conn = h.resolve_connection(connection) - clean_ids = h.validate_table_id(table_id) - by_columns = h.normalize_column_list(by, "by", allow_empty=False) + conn = v.resolve_connection(connection) + clean_ids = v.validate_table_id(table_id) + by_columns = v.normalize_column_list(by, "by", allow_empty=False) connection_supplied = connection is not None handles = { - clean_ids[0]: h.build_table_handle( + clean_ids[0]: i.build_table_handle( conn, table_a, clean_ids[0], connection_supplied=connection_supplied ), - clean_ids[1]: h.build_table_handle( + clean_ids[1]: i.build_table_handle( conn, table_b, clean_ids[1], connection_supplied=connection_supplied ), } - h.validate_tables(conn, handles, clean_ids, by_columns, coerce=coerce) + v.validate_tables(conn, handles, clean_ids, by_columns, coerce=coerce) - tables_frame = c.build_tables_frame(conn, handles, clean_ids, materialize_summary) - by_frame = c.build_by_frame( + tables_frame = f.build_tables_frame(conn, handles, clean_ids, materialize_summary) + by_frame = f.build_by_frame( conn, by_columns, handles, clean_ids, materialize_summary ) common_all = [ @@ -108,12 +109,12 @@ def compare( if col in handles[clean_ids[1]].columns ] value_columns = [col for col in common_all if col not in by_columns] - unmatched_cols = c.build_unmatched_cols( + unmatched_cols = f.build_unmatched_cols( conn, handles, clean_ids, materialize_summary ) diff_table = None if materialize_keys: - diff_table = c.compute_diff_table( + diff_table = f.compute_diff_table( conn, handles, clean_ids, @@ -121,7 +122,7 @@ def compare( value_columns, allow_both_na, ) - intersection, diff_lookup = c.build_intersection_frame( + intersection, diff_lookup = f.build_intersection_frame( value_columns, handles, clean_ids, @@ -131,10 +132,10 @@ def compare( conn, materialize_summary, ) - unmatched_keys = c.compute_unmatched_keys( + unmatched_keys = f.compute_unmatched_keys( conn, handles, clean_ids, by_columns, materialize_keys ) - unmatched_rows_rel, unmatched_lookup = c.compute_unmatched_rows_summary( + unmatched_rows_rel, unmatched_lookup = f.compute_unmatched_rows_summary( conn, unmatched_keys, clean_ids, materialize_summary ) diff --git a/python/versus/comparison/comparison.py b/python/versus/comparison/comparison.py index ef31a89..d8aceed 100644 --- a/python/versus/comparison/comparison.py +++ b/python/versus/comparison/comparison.py @@ -4,8 +4,14 @@ import duckdb -from . import _helpers as h -from . import _slices, _value_diffs, _weave +from . import _exceptions as e +from . import _relations as r +from . import _slices as l +from . import _sql as q +from . import _summary as s +from . import _types as t +from . import _value_diffs as d +from . import _weave as w class Comparison: @@ -18,8 +24,8 @@ class Comparison: def __init__( self, *, - connection: h.VersusConn, - handles: Mapping[str, h._TableHandle], + connection: t.VersusConn, + handles: Mapping[str, t._TableHandle], table_id: Tuple[str, str], by_columns: List[str], allow_both_na: bool, @@ -48,21 +54,21 @@ def __init__( self._diff_lookup = diff_lookup self._unmatched_lookup = unmatched_lookup summary_materialized = materialize_mode in {"all", "summary"} - self.tables = h.SummaryRelation( + self.tables = s.SummaryRelation( connection, tables, materialized=summary_materialized ) - self.by = h.SummaryRelation(connection, by, materialized=summary_materialized) - self.intersection = h.SummaryRelation( + self.by = s.SummaryRelation(connection, by, materialized=summary_materialized) + self.intersection = s.SummaryRelation( connection, intersection, materialized=summary_materialized, on_materialize=self._store_diff_lookup, ) - self.unmatched_cols = h.SummaryRelation( + self.unmatched_cols = s.SummaryRelation( connection, unmatched_cols, materialized=summary_materialized ) self.unmatched_keys = unmatched_keys - self.unmatched_rows = h.SummaryRelation( + self.unmatched_rows = s.SummaryRelation( connection, unmatched_rows, materialized=summary_materialized, @@ -71,7 +77,7 @@ def __init__( self.common_columns = common_columns self.table_columns = table_columns if materialize_mode == "all" and diff_table is None: - raise h.ComparisonError("Diff table is required when materialize='all'.") + raise e.ComparisonError("Diff table is required when materialize='all'.") self.diff_table = diff_table self._closed = False @@ -83,11 +89,11 @@ def _filter_diff_columns(self, columns: Sequence[str]) -> List[str]: def _store_diff_lookup(self, relation: duckdb.DuckDBPyRelation) -> None: if self._diff_lookup is None: - self._diff_lookup = h.diff_lookup_from_intersection(relation) + self._diff_lookup = r.diff_lookup_from_intersection(relation) def _store_unmatched_lookup(self, relation: duckdb.DuckDBPyRelation) -> None: if self._unmatched_lookup is None: - self._unmatched_lookup = h.unmatched_lookup_from_rows(relation) + self._unmatched_lookup = r.unmatched_lookup_from_rows(relation) def close(self) -> None: """Release any temporary views or tables created for the comparison. @@ -106,12 +112,12 @@ def close(self) -> None: return for view in reversed(self.connection.versus.views): try: - self.connection.execute(f"DROP VIEW IF EXISTS {h.ident(view)}") + self.connection.execute(f"DROP VIEW IF EXISTS {q.ident(view)}") except duckdb.Error: pass for view in self.connection.versus.temp_tables: try: - self.connection.execute(f"DROP TABLE IF EXISTS {h.ident(view)}") + self.connection.execute(f"DROP TABLE IF EXISTS {q.ident(view)}") except duckdb.Error: pass self._closed = True @@ -163,7 +169,7 @@ def value_diffs(self, column: str) -> duckdb.DuckDBPyRelation: │ 259 │ 258 │ Hornet 4 Drive │ └────────┴────────┴────────────────┘ """ - return _value_diffs.value_diffs(self, column) + return d.value_diffs(self, column) def value_diffs_stacked( self, columns: Optional[Sequence[str]] = None @@ -199,7 +205,7 @@ def value_diffs_stacked( │ disp │ 259.0 │ 258.0 │ Hornet 4 Drive │ └─────────┴───────────────┴───────────────┴────────────────┘ """ - return _value_diffs.value_diffs_stacked(self, columns) + return d.value_diffs_stacked(self, columns) def slice_diffs( self, @@ -237,7 +243,7 @@ def slice_diffs( │ Merc 240D │ 24.4 │ 4 │ 147 │ 62 │ 3.69 │ 3.19 │ 1 │ 0 │ └────────────┴──────────────┴───────┴───────┴───────┴──────────────┴──────────────┴───────┴───────┘ """ - return _slices.slice_diffs(self, table, columns) + return l.slice_diffs(self, table, columns) def slice_unmatched(self, table: str) -> duckdb.DuckDBPyRelation: """Return rows from one table whose keys are missing in the other. @@ -268,7 +274,7 @@ def slice_unmatched(self, table: str) -> duckdb.DuckDBPyRelation: │ Mazda RX4 │ 21.0 │ 6 │ 160 │ 110 │ 3.90 │ 2.62 │ 0 │ 1 │ └───────────┴──────────────┴───────┴───────┴───────┴──────────────┴──────────────┴───────┴───────┘ """ - return _slices.slice_unmatched(self, table) + return l.slice_unmatched(self, table) def slice_unmatched_both(self) -> duckdb.DuckDBPyRelation: """Return unmatched rows from both tables. @@ -296,7 +302,7 @@ def slice_unmatched_both(self) -> duckdb.DuckDBPyRelation: │ b │ Merc 450SE │ 16.4 │ 8 │ 276 │ 180 │ 3.07 │ 4.07 │ 0 │ └────────────┴────────────┴──────────────┴───────┴───────┴───────┴──────────────┴──────────────┴───────┘ """ - return _slices.slice_unmatched_both(self) + return l.slice_unmatched_both(self) def weave_diffs_wide( self, @@ -335,7 +341,7 @@ def weave_diffs_wide( │ Hornet 4 Drive │ 21.4 │ 6 │ 259 │ 258 │ 110 │ 3.08 │ 3.22 │ 1 │ └────────────────┴──────────────┴───────┴────────┴────────┴───────┴──────────────┴──────────────┴───────┘ """ - return _weave.weave_diffs_wide(self, columns, suffix) + return w.weave_diffs_wide(self, columns, suffix) def weave_diffs_long( self, @@ -372,7 +378,7 @@ def weave_diffs_long( │ b │ Hornet 4 Drive │ 21.4 │ 6 │ 258 │ 110 │ 3.08 │ 3.22 │ 1 │ └────────────┴────────────────┴──────────────┴───────┴───────┴───────┴──────────────┴──────────────┴───────┘ """ - return _weave.weave_diffs_long(self, columns) + return w.weave_diffs_long(self, columns) def summary(self) -> duckdb.DuckDBPyRelation: """Summarize which difference categories are present. @@ -401,18 +407,18 @@ def summary(self) -> duckdb.DuckDBPyRelation: │ type_diffs │ false │ └────────────────┴─────────┘ """ - value_diffs = not h.relation_is_empty( - self.intersection.filter(f"{h.ident('n_diffs')} > 0") + value_diffs = not r.relation_is_empty( + self.intersection.filter(f"{q.ident('n_diffs')} > 0") ) - unmatched_cols = not h.relation_is_empty(self.unmatched_cols) - unmatched_rows = not h.relation_is_empty( - self.unmatched_rows.filter(f"{h.ident('n_unmatched')} > 0") + unmatched_cols = not r.relation_is_empty(self.unmatched_cols) + unmatched_rows = not r.relation_is_empty( + self.unmatched_rows.filter(f"{q.ident('n_unmatched')} > 0") ) type_a_col = f"type_{self.table_id[0]}" type_b_col = f"type_{self.table_id[1]}" - type_diffs = not h.relation_is_empty( + type_diffs = not r.relation_is_empty( self.intersection.filter( - f"{h.ident(type_a_col)} IS DISTINCT FROM {h.ident(type_b_col)}" + f"{q.ident(type_a_col)} IS DISTINCT FROM {q.ident(type_b_col)}" ) ) rows = [ @@ -422,7 +428,7 @@ def summary(self) -> duckdb.DuckDBPyRelation: ("type_diffs", type_diffs), ] schema = [("difference", "VARCHAR"), ("found", "BOOLEAN")] - summary_rel = h.build_rows_relation( + summary_rel = s.build_rows_relation( self.connection, rows, schema, materialize=False ) return summary_rel From 8a1782efce2e1da264bad010b95779cecac2b9ea Mon Sep 17 00:00:00 2001 From: eutwt <11261404+eutwt@users.noreply.github.com> Date: Sun, 28 Dec 2025 16:30:54 -0500 Subject: [PATCH 3/3] Fix ruff import order --- python/versus/comparison/__init__.py | 2 +- python/versus/comparison/_inputs.py | 4 ++-- python/versus/comparison/_sql.py | 2 +- python/versus/comparison/_validation.py | 4 ++-- python/versus/comparison/api.py | 11 +---------- 5 files changed, 7 insertions(+), 16 deletions(-) diff --git a/python/versus/comparison/__init__.py b/python/versus/comparison/__init__.py index 86e2ab5..1646000 100644 --- a/python/versus/comparison/__init__.py +++ b/python/versus/comparison/__init__.py @@ -1,6 +1,6 @@ +from ._exceptions import ComparisonError from .api import compare from .comparison import Comparison -from ._exceptions import ComparisonError __all__ = [ "Comparison", diff --git a/python/versus/comparison/_inputs.py b/python/versus/comparison/_inputs.py index ac173e4..793e6e7 100644 --- a/python/versus/comparison/_inputs.py +++ b/python/versus/comparison/_inputs.py @@ -5,10 +5,10 @@ import duckdb -from ._exceptions import ComparisonError from . import _sql as q from . import _validation as v -from ._types import _Input, _TableHandle, VersusConn +from ._exceptions import ComparisonError +from ._types import VersusConn, _Input, _TableHandle if TYPE_CHECKING: # pragma: no cover import pandas diff --git a/python/versus/comparison/_sql.py b/python/versus/comparison/_sql.py index 7387a5e..7bb5d61 100644 --- a/python/versus/comparison/_sql.py +++ b/python/versus/comparison/_sql.py @@ -5,7 +5,7 @@ import duckdb from ._exceptions import ComparisonError -from ._types import _TableHandle, VersusConn +from ._types import VersusConn, _TableHandle if TYPE_CHECKING: # pragma: no cover from .comparison import Comparison diff --git a/python/versus/comparison/_validation.py b/python/versus/comparison/_validation.py index df2450d..8a7f01c 100644 --- a/python/versus/comparison/_validation.py +++ b/python/versus/comparison/_validation.py @@ -5,9 +5,9 @@ import duckdb -from ._exceptions import ComparisonError from . import _sql as q -from ._types import _TableHandle, VersusConn +from ._exceptions import ComparisonError +from ._types import VersusConn, _TableHandle if TYPE_CHECKING: # pragma: no cover from .comparison import Comparison diff --git a/python/versus/comparison/api.py b/python/versus/comparison/api.py index 0745005..3d81365 100644 --- a/python/versus/comparison/api.py +++ b/python/versus/comparison/api.py @@ -1,15 +1,6 @@ from __future__ import annotations -from typing import ( - TYPE_CHECKING, - Dict, - List, - Mapping, - Optional, - Sequence, - Tuple, - Union, -) +from typing import TYPE_CHECKING, Optional, Sequence, Tuple, Union try: from typing import Literal