diff --git a/.circleci/wait-for-db.sh b/.circleci/wait-for-db.sh index a313320279..3a2d2c652b 100755 --- a/.circleci/wait-for-db.sh +++ b/.circleci/wait-for-db.sh @@ -59,6 +59,14 @@ risingwave_ready() { probe_port 4566 } +gizmosql_ready() { + # GizmoSQL uses port 31337 for Flight SQL connections + # Also check that the server has fully started by looking for the startup message + probe_port 31337 + # Give it a few more seconds for the server to initialize after port is available + sleep 3 +} + echo "Waiting for $ENGINE to be ready..." READINESS_FUNC="${ENGINE}_ready" diff --git a/Makefile b/Makefile index 611b179eba..384c851ab0 100644 --- a/Makefile +++ b/Makefile @@ -208,6 +208,9 @@ trino-test: engine-trino-up risingwave-test: engine-risingwave-up pytest -n auto -m "risingwave" --reruns 3 --junitxml=test-results/junit-risingwave.xml +gizmosql-test: engine-gizmosql-up + pytest -n auto -m "gizmosql" --reruns 3 --junitxml=test-results/junit-gizmosql.xml + ################# # Cloud Engines # ################# diff --git a/docs/integrations/engines/gizmosql.md b/docs/integrations/engines/gizmosql.md new file mode 100644 index 0000000000..ba633ba9e5 --- /dev/null +++ b/docs/integrations/engines/gizmosql.md @@ -0,0 +1,144 @@ +# GizmoSQL + +This page provides information about how to use SQLMesh with the [GizmoSQL](https://github.com/gizmodata/gizmosql) database server. + +!!! info + The GizmoSQL engine adapter is a community contribution. Due to this, only limited community support is available. + +## Overview + +GizmoSQL is a database server that uses [DuckDB](./duckdb.md) as its execution engine and exposes an [Apache Arrow Flight SQL](https://arrow.apache.org/docs/format/FlightSql.html) interface for remote connections. This allows you to connect to a GizmoSQL server from anywhere on your network while still benefiting from DuckDB's fast analytical query processing. + +The SQLMesh GizmoSQL adapter uses [ADBC (Arrow Database Connectivity)](https://arrow.apache.org/docs/format/ADBC.html) with the Flight SQL driver to communicate with GizmoSQL servers. Data is transferred using the efficient Apache Arrow columnar format. + +!!! note + This adapter only supports the DuckDB backend for GizmoSQL. Attempting to connect to a GizmoSQL server running a different backend will result in an error. + +## Local/Built-in Scheduler + +**Engine Adapter Type**: `gizmosql` + +### Installation + +``` +pip install "sqlmesh[gizmosql]" +``` + +This will install the required dependencies: + +- `adbc-driver-flightsql` - The ADBC driver for Arrow Flight SQL +- `pyarrow` - Apache Arrow Python bindings + +## Connection options + +| Option | Description | Type | Required | +|------------------------------------|-------------------------------------------------------------------------------|:-------:|:--------:| +| `type` | Engine type name - must be `gizmosql` | string | Y | +| `host` | The hostname of the GizmoSQL server | string | N | +| `port` | The port number of the GizmoSQL server (default: `31337`) | int | N | +| `username` | The username for authentication with the GizmoSQL server | string | Y | +| `password` | The password for authentication with the GizmoSQL server | string | Y | +| `use_encryption` | Whether to use TLS encryption for the connection (default: `true`) | bool | N | +| `disable_certificate_verification`| Skip TLS certificate verification - useful for self-signed certs (default: `false`) | bool | N | +| `database` | The default database/catalog to use | string | N | + +### Example configuration + +=== "YAML" + + ```yaml linenums="1" + gateways: + gizmosql: + connection: + type: gizmosql + host: gizmosql.example.com + port: 31337 + username: my_user + password: my_password + use_encryption: true + disable_certificate_verification: false + ``` + +=== "Python" + + ```python linenums="1" + from sqlmesh.core.config import ( + Config, + GatewayConfig, + ModelDefaultsConfig, + ) + from sqlmesh.core.config.connection import GizmoSQLConnectionConfig + + config = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + gateways={ + "gizmosql": GatewayConfig( + connection=GizmoSQLConnectionConfig( + host="gizmosql.example.com", + port=31337, + username="my_user", + password="my_password", + use_encryption=True, + disable_certificate_verification=False, + ), + ), + }, + ) + ``` + +## SQL Dialect + +GizmoSQL uses the DuckDB SQL dialect. When writing models for GizmoSQL, set your model dialect to `duckdb`: + +```yaml +model_defaults: + dialect: duckdb +``` + +Or specify the dialect in individual model definitions: + +```sql +MODEL ( + name my_schema.my_model, + dialect duckdb +); + +SELECT * FROM my_table; +``` + +## Docker Setup + +For local development and testing, you can run GizmoSQL using Docker: + +```bash +docker run -d \ + --name gizmosql \ + -p 31337:31337 \ + -e GIZMOSQL_USERNAME=gizmosql_user \ + -e GIZMOSQL_PASSWORD=gizmosql_password \ + -e TLS_ENABLED=1 \ + gizmodata/gizmosql:latest +``` + +Then connect with: + +```yaml +gateways: + gizmosql: + connection: + type: gizmosql + host: localhost + port: 31337 + username: gizmosql_user + password: gizmosql_password + use_encryption: true + disable_certificate_verification: true # For self-signed certs +``` + +## Related Integrations + +GizmoSQL has adapters available for other popular data tools: + +- [Ibis GizmoSQL](https://pypi.org/project/ibis-gizmosql/) - Ibis backend for GizmoSQL +- [dbt-gizmosql](https://pypi.org/search/?q=dbt-gizmosql) - dbt adapter for GizmoSQL +- [SQLFrame GizmoSQL](https://github.com/gizmodata/sqlframe) - SQLFrame (PySpark-like API) support for GizmoSQL diff --git a/docs/integrations/overview.md b/docs/integrations/overview.md index 94b9289d21..a5059fc653 100644 --- a/docs/integrations/overview.md +++ b/docs/integrations/overview.md @@ -23,6 +23,7 @@ SQLMesh supports the following execution engines for running SQLMesh projects (e * [MySQL](./engines/mysql.md) (mysql) * [Postgres](./engines/postgres.md) (postgres) * [GCP Postgres](./engines/gcp-postgres.md) (gcppostgres) +* [GizmoSQL](./engines/gizmosql.md) (gizmosql) * [Redshift](./engines/redshift.md) (redshift) * [Snowflake](./engines/snowflake.md) (snowflake) * [Spark](./engines/spark.md) (spark) diff --git a/mkdocs.yml b/mkdocs.yml index 47ddca54e9..cf415f10e8 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -89,6 +89,7 @@ nav: - integrations/engines/mysql.md - integrations/engines/postgres.md - integrations/engines/gcp-postgres.md + - integrations/engines/gizmosql.md - integrations/engines/redshift.md - integrations/engines/risingwave.md - integrations/engines/snowflake.md diff --git a/pyproject.toml b/pyproject.toml index 2c140d4770..310e0e8aff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,6 +144,7 @@ lsp = [ "lsprotocol", ] risingwave = ["psycopg2"] +gizmosql = ["adbc-driver-flightsql", "pyarrow"] [project.scripts] sqlmesh = "sqlmesh.cli.main:cli" @@ -271,6 +272,7 @@ markers = [ "pyspark: test for PySpark that need to run separately from the other spark tests", "trino: test for Trino (all connectors)", "risingwave: test for Risingwave", + "gizmosql: test for GizmoSQL", # Other "set_default_connection", diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 638f0c28c8..69cf77f201 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -2326,6 +2326,104 @@ def init(cursor: t.Any) -> None: return init +class GizmoSQLConnectionConfig(ConnectionConfig): + """ + GizmoSQL connection configuration. + + GizmoSQL is a database server that uses DuckDB as its execution engine and + exposes an Arrow Flight SQL interface for remote connections. This configuration + uses ADBC (Arrow Database Connectivity) with the Flight SQL driver. + + Args: + host: The hostname of the GizmoSQL server. + port: The port of the GizmoSQL server (default: 31337). + username: The username for authentication. + password: The password for authentication. + use_encryption: Whether to use TLS encryption (default: True). + disable_certificate_verification: Whether to skip TLS certificate verification. + Useful for self-signed certificates in development (default: False). + database: The default database/catalog to use. + concurrent_tasks: The maximum number of concurrent tasks. + register_comments: Whether to register model comments. + pre_ping: Whether to pre-ping the connection. + """ + + host: str = "localhost" + port: int = 31337 + username: str + password: str + use_encryption: bool = True + disable_certificate_verification: bool = False + database: t.Optional[str] = None + + concurrent_tasks: int = 4 + register_comments: bool = True + pre_ping: bool = False + + type_: t.Literal["gizmosql"] = Field(alias="type", default="gizmosql") + DIALECT: t.ClassVar[t.Literal["duckdb"]] = "duckdb" + DISPLAY_NAME: t.ClassVar[t.Literal["GizmoSQL"]] = "GizmoSQL" + DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 + + _engine_import_validator = _get_engine_import_validator( + "adbc_driver_flightsql", "gizmosql", extra_name="gizmosql" + ) + + @property + def _connection_kwargs_keys(self) -> t.Set[str]: + # ADBC uses a different connection pattern, so we don't pass these directly + return set() + + @property + def _engine_adapter(self) -> t.Type[EngineAdapter]: + return engine_adapter.GizmoSQLEngineAdapter + + @property + def _connection_factory(self) -> t.Callable: + """ + Create a connection factory for GizmoSQL using ADBC Flight SQL driver. + + The connection is established using the Arrow Flight SQL protocol over gRPC. + """ + import re + from adbc_driver_flightsql import dbapi as flightsql, DatabaseOptions + + def connect() -> t.Any: + # Build the URI for the Flight SQL connection + protocol = "grpc+tls" if self.use_encryption else "grpc" + uri = f"{protocol}://{self.host}:{self.port}" + + # ADBC database-level options (passed to the driver) + db_kwargs: t.Dict[str, str] = { + "username": self.username, + "password": self.password, + } + + # Add TLS skip verify option using the proper DatabaseOptions enum + if self.use_encryption and self.disable_certificate_verification: + db_kwargs[DatabaseOptions.TLS_SKIP_VERIFY.value] = "true" + + # Create the connection - uri is first positional arg, db_kwargs is for driver options + # Explicit autocommit=True since GizmoSQL doesn't support manual transaction commits + conn = flightsql.connect(uri, db_kwargs=db_kwargs, autocommit=True) + + # Verify the backend is DuckDB - this adapter only supports the DuckDB backend + vendor_version = conn.adbc_get_info().get("vendor_version", "") + if not re.search(pattern=r"^duckdb ", string=vendor_version): + conn.close() + raise ConfigError( + f"Unsupported GizmoSQL server backend: '{vendor_version}'. " + "This adapter only supports the DuckDB backend for GizmoSQL." + ) + + return conn + + return connect + + def get_catalog(self) -> t.Optional[str]: + return self.database + + CONNECTION_CONFIG_TO_TYPE = { # Map all subclasses of ConnectionConfig to the value of their `type_` field. tpe.all_field_infos()["type_"].default: tpe diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index ab29885c7b..7a20bcea5f 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -20,6 +20,7 @@ from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter from sqlmesh.core.engine_adapter.fabric import FabricEngineAdapter +from sqlmesh.core.engine_adapter.gizmosql import GizmoSQLEngineAdapter DIALECT_TO_ENGINE_ADAPTER = { "hive": SparkEngineAdapter, @@ -37,6 +38,7 @@ "athena": AthenaEngineAdapter, "risingwave": RisingwaveEngineAdapter, "fabric": FabricEngineAdapter, + "gizmosql": GizmoSQLEngineAdapter, } DIALECT_ALIASES = { diff --git a/sqlmesh/core/engine_adapter/gizmosql.py b/sqlmesh/core/engine_adapter/gizmosql.py new file mode 100644 index 0000000000..60a285c3b2 --- /dev/null +++ b/sqlmesh/core/engine_adapter/gizmosql.py @@ -0,0 +1,284 @@ +from __future__ import annotations + +import contextlib +import typing as t + +from sqlglot import exp + +from sqlmesh.core.engine_adapter.mixins import ( + GetCurrentCatalogFromFunctionMixin, + LogicalMergeMixin, + PandasNativeFetchDFSupportMixin, + RowDiffMixin, +) +from sqlmesh.core.engine_adapter.shared import ( + CatalogSupport, + CommentCreationTable, + CommentCreationView, + DataObject, + DataObjectType, + SourceQuery, + set_catalog, +) + +if t.TYPE_CHECKING: + from sqlmesh.core._typing import SchemaName, TableName + from sqlmesh.core.engine_adapter._typing import DF + + +@set_catalog(override_mapping={"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG}) +class GizmoSQLEngineAdapter( + LogicalMergeMixin, + GetCurrentCatalogFromFunctionMixin, + PandasNativeFetchDFSupportMixin, + RowDiffMixin, +): + """ + GizmoSQL Engine Adapter. + + GizmoSQL is a database server that uses DuckDB as its execution engine and + exposes an Arrow Flight SQL interface for remote connections. This adapter + uses ADBC (Arrow Database Connectivity) with the Flight SQL driver to + communicate with GizmoSQL servers. + + Key characteristics: + - Uses DuckDB SQL dialect for query generation + - Connects remotely via Arrow Flight SQL (ADBC) + - Supports full catalog operations + - Returns data as Arrow tables for efficient transfer + """ + + DIALECT = "duckdb" + SUPPORTS_TRANSACTIONS = True + SCHEMA_DIFFER_KWARGS = { + "parameterized_type_defaults": { + exp.DataType.build("DECIMAL", dialect="duckdb").this: [(18, 3), (0,)], + }, + } + COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY + COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY + SUPPORTS_CREATE_DROP_CATALOG = True + SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"] + + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.FULL_SUPPORT + + # DDL/DML keywords that need fetch to trigger GizmoSQL's lazy execution + _DDL_DML_KEYWORDS = frozenset({ + "CREATE", "DROP", "ALTER", "TRUNCATE", "RENAME", "COMMENT", "USE", "SET", + "INSERT", "UPDATE", "DELETE", "MERGE", "COPY", "ATTACH", "DETACH", + }) + + def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None: + """ + Execute a SQL statement. + + GizmoSQL uses lazy execution - statements are not actually executed + until results are fetched. For DDL/DML statements, we call fetchall() + to ensure immediate execution. For SELECT queries, we let the caller + fetch the results. + """ + self.cursor.execute(sql, **kwargs) + + # For DDL/DML, fetch to trigger GizmoSQL's lazy execution + sql_upper = sql.strip().upper() + first_word = sql_upper.split()[0] if sql_upper else "" + if first_word in self._DDL_DML_KEYWORDS: + self.cursor.fetchall() + + @contextlib.contextmanager + def transaction( + self, + condition: t.Optional[bool] = None, + ) -> t.Iterator[None]: + """ + A transaction context manager using SQL statements. + + GizmoSQL's ADBC connection doesn't support the standard begin/commit/rollback + methods, so we use explicit SQL statements (BEGIN TRANSACTION, COMMIT, ROLLBACK) + for transaction control. + """ + if ( + self._connection_pool.is_transaction_active + or (condition is not None and not condition) + ): + yield + return + + self._connection_pool.begin() + self.cursor.execute("BEGIN TRANSACTION") + self.cursor.fetchall() + try: + yield + except Exception as e: + self.cursor.execute("ROLLBACK") + self.cursor.fetchall() + self._connection_pool.rollback() + raise e + else: + self.cursor.execute("COMMIT") + self.cursor.fetchall() + self._connection_pool.commit() + + def set_current_catalog(self, catalog: str) -> None: + """Sets the catalog name of the current connection.""" + self.execute(exp.Use(this=exp.to_identifier(catalog))) + + def _create_catalog(self, catalog_name: exp.Identifier) -> None: + """Creates a new catalog (database) in GizmoSQL.""" + self.execute( + exp.Create(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True) + ) + + def _drop_catalog(self, catalog_name: exp.Identifier) -> None: + """Drops a catalog (database) from GizmoSQL.""" + self.execute( + exp.Drop( + this=exp.Table(this=catalog_name), kind="DATABASE", cascade=True, exists=True + ) + ) + + def _df_to_source_queries( + self, + df: DF, + target_columns_to_types: t.Dict[str, exp.DataType], + batch_size: int, + target_table: TableName, + source_columns: t.Optional[t.List[str]] = None, + ) -> t.List[SourceQuery]: + """ + Convert a DataFrame to source queries for insertion. + + Uses ADBC bulk ingestion (adbc_ingest) for efficient Arrow-native data transfer + to GizmoSQL, avoiding row-by-row insertion overhead. + """ + import pyarrow as pa + + temp_table = self._get_temp_table(target_table) + + # Select only the source columns in the right order + source_columns_to_types = ( + {col: target_columns_to_types[col] for col in source_columns} + if source_columns + else target_columns_to_types + ) + ordered_df = df[list(source_columns_to_types.keys())] + + # Convert DataFrame to PyArrow Table for bulk ingestion + arrow_table = pa.Table.from_pandas(ordered_df) + + # Use ADBC bulk ingestion with temporary table + # Note: DuckDB temporary tables cannot have catalog/schema prefixes, + # so we only pass the table name when temporary=True + self.cursor.adbc_ingest( + table_name=temp_table.name, + data=arrow_table, + mode="create", + temporary=True, + ) + + # Reference temp table by name only (no schema prefix for temporary tables) + temp_table_name = exp.to_table(temp_table.name) + + return [ + SourceQuery( + query_factory=lambda: self._select_columns(target_columns_to_types).from_( + temp_table_name + ), + cleanup_func=lambda: self.drop_table(temp_table_name), + ) + ] + + def _get_data_objects( + self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None + ) -> t.List[DataObject]: + """ + Returns all the data objects that exist in the given schema and optionally catalog. + """ + catalog = self.get_current_catalog() + + if isinstance(schema_name, exp.Table): + # Ensures we don't generate identifier quotes + schema_name = ".".join(part.name for part in schema_name.parts) + + query = ( + exp.select( + exp.column("table_name").as_("name"), + exp.column("table_schema").as_("schema"), + exp.case(exp.column("table_type")) + .when( + exp.Literal.string("BASE TABLE"), + exp.Literal.string("table"), + ) + .when( + exp.Literal.string("VIEW"), + exp.Literal.string("view"), + ) + .when( + exp.Literal.string("LOCAL TEMPORARY"), + exp.Literal.string("table"), + ) + .as_("type"), + ) + .from_(exp.to_table("information_schema.tables")) + .where( + exp.column("table_catalog").eq(catalog), exp.column("table_schema").eq(schema_name) + ) + ) + if object_names: + query = query.where(exp.column("table_name").isin(*object_names)) + df = self.fetchdf(query) + return [ + DataObject( + catalog=catalog, # type: ignore + schema=row.schema, # type: ignore + name=row.name, # type: ignore + type=DataObjectType.from_str(row.type), # type: ignore + ) + for row in df.itertuples() + ] + + def _normalize_decimal_value(self, col: exp.Expression, precision: int) -> exp.Expression: + """ + GizmoSQL (via DuckDB) truncates instead of rounding when casting to decimal. + + other databases: select cast(3.14159 as decimal(38,3)) -> 3.142 + duckdb: select cast(3.14159 as decimal(38,3)) -> 3.141 + + however, we can get the behaviour of other databases by casting to double first: + select cast(cast(3.14159 as double) as decimal(38, 3)) -> 3.142 + """ + return exp.cast( + exp.cast(col, "DOUBLE"), + f"DECIMAL(38, {precision})", + ) + + def _fetch_native_df( + self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False + ) -> DF: + """ + Fetches a Pandas DataFrame from a SQL query. + + GizmoSQL returns Arrow tables which can be efficiently converted to Pandas. + """ + import pandas as pd + + sql = ( + self._to_sql(query, quote=quote_identifiers) + if isinstance(query, exp.Expression) + else query + ) + + cursor = self.cursor + cursor.execute(sql) + + # ADBC cursors support fetch_arrow_table() for efficient Arrow->Pandas conversion + if hasattr(cursor, "fetch_arrow_table"): + arrow_table = cursor.fetch_arrow_table() + return arrow_table.to_pandas() + + # Fallback to standard fetchall + DataFrame construction + result = cursor.fetchall() + columns = [desc[0] for desc in cursor.description] if cursor.description else [] + return pd.DataFrame(result, columns=columns) diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index 4ad6a17944..0d382be927 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -70,6 +70,7 @@ def pytest_marks(self) -> t.List[MarkDecorator]: ENGINES = [ # Docker engines that can be locally tested IntegrationTestEngine("duckdb"), + IntegrationTestEngine("gizmosql"), IntegrationTestEngine("postgres"), IntegrationTestEngine("mysql"), IntegrationTestEngine("mssql"), diff --git a/tests/core/engine_adapter/integration/config.yaml b/tests/core/engine_adapter/integration/config.yaml index 0b1ecd8193..8ee1e7caf7 100644 --- a/tests/core/engine_adapter/integration/config.yaml +++ b/tests/core/engine_adapter/integration/config.yaml @@ -119,6 +119,19 @@ gateways: port: 4566 check_import: false + inttest_gizmosql: + connection: + type: gizmosql + host: {{ env_var('GIZMOSQL_HOST', 'localhost') }} + port: {{ env_var('GIZMOSQL_PORT', '31337') }} + username: {{ env_var('GIZMOSQL_USERNAME', 'gizmosql_username') }} + password: {{ env_var('GIZMOSQL_PASSWORD', 'gizmosql_password') }} + use_encryption: true + disable_certificate_verification: true + check_import: false + state_connection: + type: duckdb + # Cloud databases inttest_snowflake: diff --git a/tests/core/engine_adapter/integration/docker/compose.gizmosql.yaml b/tests/core/engine_adapter/integration/docker/compose.gizmosql.yaml new file mode 100644 index 0000000000..d7fe4cb3a7 --- /dev/null +++ b/tests/core/engine_adapter/integration/docker/compose.gizmosql.yaml @@ -0,0 +1,18 @@ +services: + gizmosql: + image: gizmodata/gizmosql:latest + ports: + - '31337:31337' + environment: + GIZMOSQL_USERNAME: gizmosql_username + GIZMOSQL_PASSWORD: gizmosql_password + TLS_ENABLED: "1" + PRINT_QUERIES: "1" + # Use a writable database file instead of the default TPC-H sample + DATABASE_FILENAME: "/tmp/sqlmesh_test.duckdb" + healthcheck: + test: ["CMD-SHELL", "grep -q 'GizmoSQL server - started' /proc/1/fd/1 || exit 1"] + interval: 5s + timeout: 30s + retries: 10 + start_period: 10s diff --git a/tests/core/engine_adapter/integration/test_integration_gizmosql.py b/tests/core/engine_adapter/integration/test_integration_gizmosql.py new file mode 100644 index 0000000000..8480dc82cb --- /dev/null +++ b/tests/core/engine_adapter/integration/test_integration_gizmosql.py @@ -0,0 +1,305 @@ +"""Integration tests for GizmoSQL engine adapter. + +These tests require a running GizmoSQL server with DuckDB backend. +They are marked with the 'gizmosql' and 'docker' pytest markers. +""" + +import typing as t + +import pytest +from sqlglot import exp + +from sqlmesh.core.config.connection import GizmoSQLConnectionConfig +from sqlmesh.core.engine_adapter.gizmosql import GizmoSQLEngineAdapter + +pytestmark = [pytest.mark.gizmosql, pytest.mark.engine, pytest.mark.docker] + + +@pytest.fixture(scope="session") +def gizmosql_config() -> GizmoSQLConnectionConfig: + """Create a GizmoSQL connection config for testing. + + Environment variables can override defaults: + - GIZMOSQL_HOST: hostname (default: localhost) + - GIZMOSQL_PORT: port (default: 31337) + - GIZMOSQL_USERNAME: username (default: gizmosql_username) + - GIZMOSQL_PASSWORD: password (default: gizmosql_password) + """ + import os + + return GizmoSQLConnectionConfig( + host=os.environ.get("GIZMOSQL_HOST", "localhost"), + port=int(os.environ.get("GIZMOSQL_PORT", "31337")), + username=os.environ.get("GIZMOSQL_USERNAME", "gizmosql_username"), + password=os.environ.get("GIZMOSQL_PASSWORD", "gizmosql_password"), + use_encryption=True, + disable_certificate_verification=True, + ) + + +@pytest.fixture(scope="session") +def gizmosql_adapter(gizmosql_config: GizmoSQLConnectionConfig) -> t.Generator[GizmoSQLEngineAdapter, None, None]: + """Create a GizmoSQL engine adapter for testing.""" + adapter = gizmosql_config.create_engine_adapter() + yield adapter + adapter.close() + + +def test_connection(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test that we can connect and execute a simple query.""" + cursor = gizmosql_adapter.connection.cursor() + cursor.execute("SELECT 1") + result = cursor.fetchone() + assert result[0] == 1 + + +def test_dialect(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test that the adapter uses the DuckDB dialect.""" + assert gizmosql_adapter.dialect == "duckdb" + + +def test_create_and_drop_schema(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test creating and dropping schemas.""" + schema_name = "test_gizmosql_schema" + + # Clean up if exists + gizmosql_adapter.drop_schema(schema_name, ignore_if_not_exists=True, cascade=True) + + # Create schema + gizmosql_adapter.create_schema(schema_name) + + # Verify it exists by selecting from information_schema + result = gizmosql_adapter.fetchone( + f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{schema_name}'" + ) + assert result is not None + assert result[0] == schema_name + + # Drop schema + gizmosql_adapter.drop_schema(schema_name, cascade=True) + + # Verify it's gone + result = gizmosql_adapter.fetchone( + f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{schema_name}'" + ) + assert result is None + + +def test_create_table_and_insert(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test creating a table and inserting data.""" + schema_name = "test_gizmosql_table_schema" + table_name = f"{schema_name}.test_table" + + try: + # Setup + gizmosql_adapter.drop_schema(schema_name, ignore_if_not_exists=True, cascade=True) + gizmosql_adapter.create_schema(schema_name) + + # Create table + columns_to_types = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR"), + "value": exp.DataType.build("DOUBLE"), + } + gizmosql_adapter.create_table(table_name, columns_to_types) + + # Insert data using SQL + gizmosql_adapter.execute( + f"INSERT INTO {table_name} (id, name, value) VALUES (1, 'test', 3.14)" + ) + + # Query data + result = gizmosql_adapter.fetchone(f"SELECT * FROM {table_name}") + assert result is not None + assert result[0] == 1 + assert result[1] == "test" + assert abs(result[2] - 3.14) < 0.001 + + finally: + # Cleanup + gizmosql_adapter.drop_schema(schema_name, ignore_if_not_exists=True, cascade=True) + + +def test_fetchdf(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test fetching results as a pandas DataFrame.""" + import pandas as pd + + df = gizmosql_adapter.fetchdf("SELECT 1 as a, 2 as b, 'hello' as c") + + assert isinstance(df, pd.DataFrame) + assert len(df) == 1 + assert list(df.columns) == ["a", "b", "c"] + assert df["a"].iloc[0] == 1 + assert df["b"].iloc[0] == 2 + assert df["c"].iloc[0] == "hello" + + +def test_fetchall(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test fetchall returns list of tuples.""" + result = gizmosql_adapter.fetchall("SELECT 1 as a, 2 as b UNION ALL SELECT 3, 4") + + assert len(result) == 2 + + +def test_fetchone(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test fetchone returns a single row.""" + result = gizmosql_adapter.fetchone("SELECT 42 as answer, 'hello' as greeting") + + assert result is not None + assert result[0] == 42 + assert result[1] == "hello" + + +def test_table_exists(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test checking if a table exists.""" + schema_name = "test_gizmosql_exists_schema" + table_name = f"{schema_name}.test_exists_table" + + try: + # Setup + gizmosql_adapter.drop_schema(schema_name, ignore_if_not_exists=True, cascade=True) + gizmosql_adapter.create_schema(schema_name) + + # Table should not exist yet + assert not gizmosql_adapter.table_exists(exp.to_table(table_name)) + + # Create table + columns_to_types = {"id": exp.DataType.build("INT")} + gizmosql_adapter.create_table(table_name, columns_to_types) + + # Table should exist now + assert gizmosql_adapter.table_exists(exp.to_table(table_name)) + + finally: + # Cleanup + gizmosql_adapter.drop_schema(schema_name, ignore_if_not_exists=True, cascade=True) + + +def test_get_current_catalog(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test getting the current catalog.""" + catalog = gizmosql_adapter.get_current_catalog() + # GizmoSQL should return a catalog name + assert catalog is not None + assert isinstance(catalog, str) + + +def test_get_current_schema(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test getting the current schema via SQL.""" + result = gizmosql_adapter.fetchone("SELECT current_schema()") + assert result is not None + assert isinstance(result[0], str) + + +def test_information_schema_access(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test that we can query information_schema.""" + result = gizmosql_adapter.fetchall( + "SELECT schema_name FROM information_schema.schemata LIMIT 5" + ) + assert result is not None + assert len(result) > 0 + + +def test_complex_query(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test a more complex analytical query with CTEs and aggregations.""" + query = """ + WITH numbers AS ( + SELECT unnest(ARRAY[1, 2, 3, 4, 5]) as n + ) + SELECT + SUM(n) as total, + AVG(n) as average, + MIN(n) as minimum, + MAX(n) as maximum + FROM numbers + """ + result = gizmosql_adapter.fetchone(query) + + assert result is not None + assert result[0] == 15 # sum + assert result[1] == 3.0 # avg + assert result[2] == 1 # min + assert result[3] == 5 # max + + +def test_fetchdf_with_types(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test fetching DataFrame with various data types.""" + import pandas as pd + + query = """ + SELECT + 1::INTEGER as int_col, + 3.14::DOUBLE as double_col, + 'test'::VARCHAR as varchar_col, + true::BOOLEAN as bool_col + """ + df = gizmosql_adapter.fetchdf(query) + + assert isinstance(df, pd.DataFrame) + assert len(df) == 1 + assert df["int_col"].iloc[0] == 1 + assert abs(df["double_col"].iloc[0] - 3.14) < 0.001 + assert df["varchar_col"].iloc[0] == "test" + assert df["bool_col"].iloc[0] == True # noqa: E712 - numpy bool comparison + + +def test_query_with_expressions(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test querying using SQLGlot expressions.""" + from sqlglot import select, exp + + query = select( + exp.Literal.number(1).as_("a"), + exp.Literal.string("hello").as_("b"), + ) + + result = gizmosql_adapter.fetchone(query) + assert result is not None + assert result[0] == 1 + assert result[1] == "hello" + + +def test_dataframe_bulk_ingestion(gizmosql_adapter: GizmoSQLEngineAdapter): + """Test bulk DataFrame ingestion using ADBC adbc_ingest.""" + import pandas as pd + + schema_name = "test_bulk_ingest_schema" + table_name = f"{schema_name}.bulk_test_table" + + try: + # Setup + gizmosql_adapter.drop_schema(schema_name, ignore_if_not_exists=True, cascade=True) + gizmosql_adapter.create_schema(schema_name) + + # Create a test DataFrame + df = pd.DataFrame({ + "id": [1, 2, 3, 4, 5], + "name": ["alice", "bob", "charlie", "diana", "eve"], + "value": [10.5, 20.5, 30.5, 40.5, 50.5], + }) + + # Create target table + columns_to_types = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR"), + "value": exp.DataType.build("DOUBLE"), + } + gizmosql_adapter.create_table(table_name, columns_to_types) + + # Use replace_query with DataFrame (this uses _df_to_source_queries internally) + gizmosql_adapter.replace_query( + table_name, + df, + columns_to_types, + ) + + # Verify data was loaded + result = gizmosql_adapter.fetchall(f"SELECT * FROM {table_name} ORDER BY id") + assert len(result) == 5 + assert result[0][0] == 1 + assert result[0][1] == "alice" + assert abs(result[0][2] - 10.5) < 0.001 + assert result[4][0] == 5 + assert result[4][1] == "eve" + + finally: + # Cleanup + gizmosql_adapter.drop_schema(schema_name, ignore_if_not_exists=True, cascade=True) diff --git a/tests/core/engine_adapter/test_gizmosql.py b/tests/core/engine_adapter/test_gizmosql.py new file mode 100644 index 0000000000..9adb27a376 --- /dev/null +++ b/tests/core/engine_adapter/test_gizmosql.py @@ -0,0 +1,208 @@ +"""Unit tests for GizmoSQL engine adapter and connection config. + +These tests don't require a running GizmoSQL server - they test +configuration validation and adapter properties. +""" + +import typing as t +from unittest.mock import MagicMock, patch + +import pytest +from sqlglot import exp + +from sqlmesh.core.config.connection import GizmoSQLConnectionConfig +from sqlmesh.core.engine_adapter.gizmosql import GizmoSQLEngineAdapter +from sqlmesh.utils.errors import ConfigError + +pytestmark = [pytest.mark.gizmosql] + + +class TestGizmoSQLConnectionConfig: + """Tests for GizmoSQLConnectionConfig.""" + + def test_default_values(self): + """Test default configuration values.""" + config = GizmoSQLConnectionConfig( + username="user", + password="pass", + ) + + assert config.host == "localhost" + assert config.port == 31337 + assert config.use_encryption is True + assert config.disable_certificate_verification is False + assert config.database is None + assert config.concurrent_tasks == 4 + + def test_custom_values(self): + """Test custom configuration values.""" + config = GizmoSQLConnectionConfig( + host="gizmosql.example.com", + port=12345, + username="testuser", + password="testpass", + use_encryption=False, + disable_certificate_verification=True, + database="mydb", + concurrent_tasks=8, + ) + + assert config.host == "gizmosql.example.com" + assert config.port == 12345 + assert config.username == "testuser" + assert config.password == "testpass" + assert config.use_encryption is False + assert config.disable_certificate_verification is True + assert config.database == "mydb" + assert config.concurrent_tasks == 8 + + def test_dialect_is_duckdb(self): + """Test that the dialect is set to duckdb.""" + config = GizmoSQLConnectionConfig( + username="user", + password="pass", + ) + assert config.DIALECT == "duckdb" + + def test_type_is_gizmosql(self): + """Test that the type is set to gizmosql.""" + config = GizmoSQLConnectionConfig( + username="user", + password="pass", + ) + assert config.type_ == "gizmosql" + + def test_get_catalog(self): + """Test get_catalog returns database.""" + config = GizmoSQLConnectionConfig( + username="user", + password="pass", + database="mydb", + ) + assert config.get_catalog() == "mydb" + + def test_get_catalog_none(self): + """Test get_catalog returns None when database not set.""" + config = GizmoSQLConnectionConfig( + username="user", + password="pass", + ) + assert config.get_catalog() is None + + def test_connection_factory_builds_correct_uri_with_tls(self): + """Test connection factory builds correct URI with TLS.""" + mock_flightsql = MagicMock() + mock_conn = MagicMock() + mock_conn.adbc_get_info.return_value = {"vendor_version": "duckdb v1.0.0"} + mock_flightsql.connect.return_value = mock_conn + + # Mock DatabaseOptions enum + mock_db_options = MagicMock() + mock_db_options.TLS_SKIP_VERIFY.value = "adbc.flight.sql.client_option.tls_skip_verify" + + config = GizmoSQLConnectionConfig( + host="example.com", + port=31337, + username="user", + password="pass", + use_encryption=True, + disable_certificate_verification=False, + ) + + mock_module = MagicMock() + mock_module.dbapi = mock_flightsql + mock_module.DatabaseOptions = mock_db_options + + with patch.dict("sys.modules", {"adbc_driver_flightsql": mock_module}): + factory = config._connection_factory + factory() + + # Verify connect was called with correct URI (first positional arg) + call_args = mock_flightsql.connect.call_args + assert call_args[0][0] == "grpc+tls://example.com:31337" + # Verify db_kwargs contains credentials + db_kwargs = call_args[1]["db_kwargs"] + assert db_kwargs["username"] == "user" + assert db_kwargs["password"] == "pass" + + def test_connection_factory_builds_correct_uri_without_tls(self): + """Test connection factory builds correct URI without TLS.""" + mock_flightsql = MagicMock() + mock_conn = MagicMock() + mock_conn.adbc_get_info.return_value = {"vendor_version": "duckdb v1.0.0"} + mock_flightsql.connect.return_value = mock_conn + + # Mock DatabaseOptions enum + mock_db_options = MagicMock() + mock_db_options.TLS_SKIP_VERIFY.value = "adbc.flight.sql.client_option.tls_skip_verify" + + config = GizmoSQLConnectionConfig( + host="example.com", + port=31337, + username="user", + password="pass", + use_encryption=False, + ) + + mock_module = MagicMock() + mock_module.dbapi = mock_flightsql + mock_module.DatabaseOptions = mock_db_options + + with patch.dict("sys.modules", {"adbc_driver_flightsql": mock_module}): + factory = config._connection_factory + factory() + + call_args = mock_flightsql.connect.call_args + assert call_args[0][0] == "grpc://example.com:31337" + + def test_connection_factory_rejects_non_duckdb_backend(self): + """Test connection factory raises error for non-DuckDB backend.""" + mock_flightsql = MagicMock() + mock_conn = MagicMock() + mock_conn.adbc_get_info.return_value = {"vendor_version": "sqlite v3.40.0"} + mock_flightsql.connect.return_value = mock_conn + + # Mock DatabaseOptions enum + mock_db_options = MagicMock() + mock_db_options.TLS_SKIP_VERIFY.value = "adbc.flight.sql.client_option.tls_skip_verify" + + config = GizmoSQLConnectionConfig( + host="example.com", + port=31337, + username="user", + password="pass", + ) + + mock_module = MagicMock() + mock_module.dbapi = mock_flightsql + mock_module.DatabaseOptions = mock_db_options + + with patch.dict("sys.modules", {"adbc_driver_flightsql": mock_module}): + factory = config._connection_factory + with pytest.raises(ConfigError, match="Unsupported GizmoSQL server backend"): + factory() + + # Verify connection was closed after rejection + mock_conn.close.assert_called_once() + + +class TestGizmoSQLEngineAdapter: + """Tests for GizmoSQLEngineAdapter properties.""" + + def test_dialect(self): + """Test that dialect is duckdb.""" + assert GizmoSQLEngineAdapter.DIALECT == "duckdb" + + def test_supports_transactions(self): + """Test transactions are supported via SQL statements.""" + assert GizmoSQLEngineAdapter.SUPPORTS_TRANSACTIONS is True + + def test_supports_create_drop_catalog(self): + """Test catalog operations are supported.""" + assert GizmoSQLEngineAdapter.SUPPORTS_CREATE_DROP_CATALOG is True + + def test_supported_drop_cascade_object_kinds(self): + """Test cascade drop is supported for schemas, tables, and views.""" + assert "SCHEMA" in GizmoSQLEngineAdapter.SUPPORTED_DROP_CASCADE_OBJECT_KINDS + assert "TABLE" in GizmoSQLEngineAdapter.SUPPORTED_DROP_CASCADE_OBJECT_KINDS + assert "VIEW" in GizmoSQLEngineAdapter.SUPPORTED_DROP_CASCADE_OBJECT_KINDS