diff --git a/sqlmesh/core/state_sync/db/facade.py b/sqlmesh/core/state_sync/db/facade.py index 3c23ef339c..ad357f8c57 100644 --- a/sqlmesh/core/state_sync/db/facade.py +++ b/sqlmesh/core/state_sync/db/facade.py @@ -456,7 +456,7 @@ def migrate( ) -> None: """Migrate the state sync to the latest SQLMesh / SQLGlot version.""" self.migrator.migrate( - self, + self.schema, skip_backup=skip_backup, promoted_snapshots_only=promoted_snapshots_only, ) diff --git a/sqlmesh/core/state_sync/db/migrator.py b/sqlmesh/core/state_sync/db/migrator.py index 3e3f978b96..ad60c57570 100644 --- a/sqlmesh/core/state_sync/db/migrator.py +++ b/sqlmesh/core/state_sync/db/migrator.py @@ -30,7 +30,6 @@ MIN_SCHEMA_VERSION, MIN_SQLMESH_VERSION, ) -from sqlmesh.core.state_sync.base import StateSync from sqlmesh.core.state_sync.db.environment import EnvironmentState from sqlmesh.core.state_sync.db.interval import IntervalState from sqlmesh.core.state_sync.db.snapshot import SnapshotState @@ -85,7 +84,7 @@ def __init__( def migrate( self, - state_sync: StateSync, + schema: t.Optional[str], skip_backup: bool = False, promoted_snapshots_only: bool = True, ) -> None: @@ -94,7 +93,7 @@ def migrate( migration_start_ts = time.perf_counter() try: - migrate_rows = self._apply_migrations(state_sync, skip_backup) + migrate_rows = self._apply_migrations(schema, skip_backup) if not migrate_rows and major_minor(SQLMESH_VERSION) == versions.minor_sqlmesh_version: return @@ -153,7 +152,7 @@ def rollback(self) -> None: def _apply_migrations( self, - state_sync: StateSync, + schema: t.Optional[str], skip_backup: bool, ) -> bool: versions = self.version_state.get_versions() @@ -184,10 +183,10 @@ def _apply_migrations( for migration in migrations: logger.info(f"Applying migration {migration}") - migration.migrate_schemas(state_sync) + migration.migrate_schemas(engine_adapter=self.engine_adapter, schema=schema) if state_table_exist: # No need to run DML for the initial migration since all tables are empty - migration.migrate_rows(state_sync) + migration.migrate_rows(engine_adapter=self.engine_adapter, schema=schema) snapshot_count_after = self.snapshot_state.count() diff --git a/sqlmesh/migrations/v0000_baseline.py b/sqlmesh/migrations/v0000_baseline.py index 4891900a76..abd316fcfe 100644 --- a/sqlmesh/migrations/v0000_baseline.py +++ b/sqlmesh/migrations/v0000_baseline.py @@ -4,15 +4,12 @@ from sqlmesh.utils.migration import blob_text_type, index_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore - schema = state_sync.schema - engine_adapter = state_sync.engine_adapter - +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore intervals_table = "_intervals" snapshots_table = "_snapshots" environments_table = "_environments" versions_table = "_versions" - if state_sync.schema: + if schema: engine_adapter.create_schema(schema) intervals_table = f"{schema}.{intervals_table}" snapshots_table = f"{schema}.{snapshots_table}" @@ -94,5 +91,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.create_index(intervals_table, "_intervals_name_version_idx", ("name", "version")) -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0061_mysql_fix_blob_text_type.py b/sqlmesh/migrations/v0061_mysql_fix_blob_text_type.py index 34b765b3ad..897974f09a 100644 --- a/sqlmesh/migrations/v0061_mysql_fix_blob_text_type.py +++ b/sqlmesh/migrations/v0061_mysql_fix_blob_text_type.py @@ -9,12 +9,9 @@ from sqlmesh.utils.migration import blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore if engine_adapter.dialect != "mysql": return - - schema = state_sync.schema environments_table = "_environments" snapshots_table = "_snapshots" @@ -46,5 +43,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.execute(alter_table_exp) -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0062_add_model_gateway.py b/sqlmesh/migrations/v0062_add_model_gateway.py index 524a94044a..f65d8224ec 100644 --- a/sqlmesh/migrations/v0062_add_model_gateway.py +++ b/sqlmesh/migrations/v0062_add_model_gateway.py @@ -1,9 +1,9 @@ """Add the gateway model attribute.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0063_change_signals.py b/sqlmesh/migrations/v0063_change_signals.py index 8806c9ea60..bbced547fd 100644 --- a/sqlmesh/migrations/v0063_change_signals.py +++ b/sqlmesh/migrations/v0063_change_signals.py @@ -7,15 +7,13 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" index_type = index_text_type(engine_adapter.dialect) if schema: diff --git a/sqlmesh/migrations/v0064_join_when_matched_strings.py b/sqlmesh/migrations/v0064_join_when_matched_strings.py index 6da3164a38..ffd4c94913 100644 --- a/sqlmesh/migrations/v0064_join_when_matched_strings.py +++ b/sqlmesh/migrations/v0064_join_when_matched_strings.py @@ -7,15 +7,13 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" index_type = index_text_type(engine_adapter.dialect) if schema: diff --git a/sqlmesh/migrations/v0065_add_model_optimize.py b/sqlmesh/migrations/v0065_add_model_optimize.py index 09240aa61e..e9bc646666 100644 --- a/sqlmesh/migrations/v0065_add_model_optimize.py +++ b/sqlmesh/migrations/v0065_add_model_optimize.py @@ -1,9 +1,9 @@ """Add the optimize_query model attribute.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0066_add_auto_restatements.py b/sqlmesh/migrations/v0066_add_auto_restatements.py index 96d2cd45e8..9eea773573 100644 --- a/sqlmesh/migrations/v0066_add_auto_restatements.py +++ b/sqlmesh/migrations/v0066_add_auto_restatements.py @@ -5,9 +5,7 @@ from sqlmesh.utils.migration import index_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore auto_restatements_table = "_auto_restatements" intervals_table = "_intervals" @@ -40,9 +38,7 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.execute(alter_table_exp) -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore intervals_table = "_intervals" if schema: diff --git a/sqlmesh/migrations/v0067_add_tsql_date_full_precision.py b/sqlmesh/migrations/v0067_add_tsql_date_full_precision.py index d4fd93eda4..1243118df0 100644 --- a/sqlmesh/migrations/v0067_add_tsql_date_full_precision.py +++ b/sqlmesh/migrations/v0067_add_tsql_date_full_precision.py @@ -1,9 +1,9 @@ """Add full precision for tsql to support nanoseconds.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0068_include_unrendered_query_in_metadata_hash.py b/sqlmesh/migrations/v0068_include_unrendered_query_in_metadata_hash.py index 6f7ddbdc1c..35142e9aeb 100644 --- a/sqlmesh/migrations/v0068_include_unrendered_query_in_metadata_hash.py +++ b/sqlmesh/migrations/v0068_include_unrendered_query_in_metadata_hash.py @@ -1,9 +1,9 @@ """Include the unrendered query in the metadata hash.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0069_update_dev_table_suffix.py b/sqlmesh/migrations/v0069_update_dev_table_suffix.py index 57b41a816c..f69aac434e 100644 --- a/sqlmesh/migrations/v0069_update_dev_table_suffix.py +++ b/sqlmesh/migrations/v0069_update_dev_table_suffix.py @@ -7,15 +7,13 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" environments_table = "_environments" if schema: diff --git a/sqlmesh/migrations/v0070_include_grains_in_metadata_hash.py b/sqlmesh/migrations/v0070_include_grains_in_metadata_hash.py index 4b339d8e97..d0dbdd5563 100644 --- a/sqlmesh/migrations/v0070_include_grains_in_metadata_hash.py +++ b/sqlmesh/migrations/v0070_include_grains_in_metadata_hash.py @@ -1,9 +1,9 @@ """Include grains in the metadata hash.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0071_add_dev_version_to_intervals.py b/sqlmesh/migrations/v0071_add_dev_version_to_intervals.py index 4e6cbab4f0..61a49dc0b9 100644 --- a/sqlmesh/migrations/v0071_add_dev_version_to_intervals.py +++ b/sqlmesh/migrations/v0071_add_dev_version_to_intervals.py @@ -8,9 +8,7 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore intervals_table = "_intervals" if schema: intervals_table = f"{schema}.{intervals_table}" @@ -29,9 +27,7 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.execute(alter_table_exp) -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore intervals_table = "_intervals" snapshots_table = "_snapshots" if schema: diff --git a/sqlmesh/migrations/v0072_add_environment_statements.py b/sqlmesh/migrations/v0072_add_environment_statements.py index e73faf2b9a..4ed52b5c47 100644 --- a/sqlmesh/migrations/v0072_add_environment_statements.py +++ b/sqlmesh/migrations/v0072_add_environment_statements.py @@ -5,9 +5,7 @@ from sqlmesh.utils.migration import blob_text_type, index_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore environment_statements_table = "_environment_statements" if schema: @@ -27,5 +25,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore ) -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0073_remove_symbolic_disable_restatement.py b/sqlmesh/migrations/v0073_remove_symbolic_disable_restatement.py index 40e74d6426..708693ed61 100644 --- a/sqlmesh/migrations/v0073_remove_symbolic_disable_restatement.py +++ b/sqlmesh/migrations/v0073_remove_symbolic_disable_restatement.py @@ -6,15 +6,13 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0074_add_partition_by_time_column_property.py b/sqlmesh/migrations/v0074_add_partition_by_time_column_property.py index 04f1a27254..acd349c888 100644 --- a/sqlmesh/migrations/v0074_add_partition_by_time_column_property.py +++ b/sqlmesh/migrations/v0074_add_partition_by_time_column_property.py @@ -2,9 +2,9 @@ (default: True to keep the original behaviour)""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0075_remove_validate_query.py b/sqlmesh/migrations/v0075_remove_validate_query.py index f6d4e255d9..9fdcca7ea6 100644 --- a/sqlmesh/migrations/v0075_remove_validate_query.py +++ b/sqlmesh/migrations/v0075_remove_validate_query.py @@ -8,15 +8,13 @@ from sqlmesh.utils.migration import blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" index_type = index_text_type(engine_adapter.dialect) if schema: diff --git a/sqlmesh/migrations/v0076_add_cron_tz.py b/sqlmesh/migrations/v0076_add_cron_tz.py index 300474aa18..909017c8cd 100644 --- a/sqlmesh/migrations/v0076_add_cron_tz.py +++ b/sqlmesh/migrations/v0076_add_cron_tz.py @@ -1,9 +1,9 @@ """Add 'cron_tz' property to node definition.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0077_fix_column_type_hash_calculation.py b/sqlmesh/migrations/v0077_fix_column_type_hash_calculation.py index 2aec1140f1..68953836bd 100644 --- a/sqlmesh/migrations/v0077_fix_column_type_hash_calculation.py +++ b/sqlmesh/migrations/v0077_fix_column_type_hash_calculation.py @@ -1,9 +1,9 @@ """Use the model's dialect when calculating the hash for the column types.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0078_warn_if_non_migratable_python_env.py b/sqlmesh/migrations/v0078_warn_if_non_migratable_python_env.py index c24b6a5168..adf1e96dd0 100644 --- a/sqlmesh/migrations/v0078_warn_if_non_migratable_python_env.py +++ b/sqlmesh/migrations/v0078_warn_if_non_migratable_python_env.py @@ -24,13 +24,11 @@ from sqlmesh.core.console import get_console -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0079_add_gateway_managed_property.py b/sqlmesh/migrations/v0079_add_gateway_managed_property.py index 8d24601102..7650d6d765 100644 --- a/sqlmesh/migrations/v0079_add_gateway_managed_property.py +++ b/sqlmesh/migrations/v0079_add_gateway_managed_property.py @@ -3,11 +3,10 @@ from sqlglot import exp -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore environments_table = "_environments" - if state_sync.schema: - environments_table = f"{state_sync.schema}.{environments_table}" + if schema: + environments_table = f"{schema}.{environments_table}" alter_table_exp = exp.Alter( this=exp.to_table(environments_table), @@ -22,13 +21,12 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.execute(alter_table_exp) -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore environments_table = "_environments" - if state_sync.schema: - environments_table = f"{state_sync.schema}.{environments_table}" + if schema: + environments_table = f"{schema}.{environments_table}" - state_sync.engine_adapter.update_table( + engine_adapter.update_table( environments_table, {"gateway_managed": False}, where=exp.true(), diff --git a/sqlmesh/migrations/v0080_add_batch_size_to_scd_type_2_models.py b/sqlmesh/migrations/v0080_add_batch_size_to_scd_type_2_models.py index 582bdd3da9..35cb3977cc 100644 --- a/sqlmesh/migrations/v0080_add_batch_size_to_scd_type_2_models.py +++ b/sqlmesh/migrations/v0080_add_batch_size_to_scd_type_2_models.py @@ -1,9 +1,9 @@ """Add batch_size to SCD Type 2 models and add updated_at_name to by time which changes their data hash.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0081_update_partitioned_by.py b/sqlmesh/migrations/v0081_update_partitioned_by.py index 611d8f6973..8740285bf0 100644 --- a/sqlmesh/migrations/v0081_update_partitioned_by.py +++ b/sqlmesh/migrations/v0081_update_partitioned_by.py @@ -8,15 +8,13 @@ from sqlmesh.utils.migration import blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" index_type = index_text_type(engine_adapter.dialect) if schema: diff --git a/sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py b/sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py index 6eadbfc2c3..5565b099cd 100644 --- a/sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py +++ b/sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py @@ -34,13 +34,11 @@ from sqlmesh.core.console import get_console -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0083_use_sql_for_scd_time_data_type_data_hash.py b/sqlmesh/migrations/v0083_use_sql_for_scd_time_data_type_data_hash.py index 38c84afafd..5dbe0847f9 100644 --- a/sqlmesh/migrations/v0083_use_sql_for_scd_time_data_type_data_hash.py +++ b/sqlmesh/migrations/v0083_use_sql_for_scd_time_data_type_data_hash.py @@ -1,9 +1,9 @@ """Use sql(...) instead of gen when computing the data hash of the time data type.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0084_normalize_quote_when_matched_and_merge_filter.py b/sqlmesh/migrations/v0084_normalize_quote_when_matched_and_merge_filter.py index 5401c97d77..9edb0051ba 100644 --- a/sqlmesh/migrations/v0084_normalize_quote_when_matched_and_merge_filter.py +++ b/sqlmesh/migrations/v0084_normalize_quote_when_matched_and_merge_filter.py @@ -5,9 +5,9 @@ """ -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0085_deterministic_repr.py b/sqlmesh/migrations/v0085_deterministic_repr.py index 1a90277bbe..81cb0f194e 100644 --- a/sqlmesh/migrations/v0085_deterministic_repr.py +++ b/sqlmesh/migrations/v0085_deterministic_repr.py @@ -36,15 +36,13 @@ def _dict_sort(obj: t.Any) -> str: return repr(obj) -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0086_check_deterministic_bug.py b/sqlmesh/migrations/v0086_check_deterministic_bug.py index 0679414881..f44e5b8e33 100644 --- a/sqlmesh/migrations/v0086_check_deterministic_bug.py +++ b/sqlmesh/migrations/v0086_check_deterministic_bug.py @@ -10,13 +10,11 @@ KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"] -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore snapshots_table = "_snapshots" versions_table = "_versions" if schema: diff --git a/sqlmesh/migrations/v0087_normalize_blueprint_variables.py b/sqlmesh/migrations/v0087_normalize_blueprint_variables.py index 2f23a0653e..fe737861c2 100644 --- a/sqlmesh/migrations/v0087_normalize_blueprint_variables.py +++ b/sqlmesh/migrations/v0087_normalize_blueprint_variables.py @@ -35,15 +35,13 @@ class SqlValue: sql: str -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0088_warn_about_variable_python_env_diffs.py b/sqlmesh/migrations/v0088_warn_about_variable_python_env_diffs.py index 405aad725f..0aa7171821 100644 --- a/sqlmesh/migrations/v0088_warn_about_variable_python_env_diffs.py +++ b/sqlmesh/migrations/v0088_warn_about_variable_python_env_diffs.py @@ -35,13 +35,11 @@ METADATA_HASH_EXPRESSIONS = {"on_virtual_update", "audits", "signals", "audit_definitions"} -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0089_add_virtual_environment_mode.py b/sqlmesh/migrations/v0089_add_virtual_environment_mode.py index 63d491418f..88126c76d7 100644 --- a/sqlmesh/migrations/v0089_add_virtual_environment_mode.py +++ b/sqlmesh/migrations/v0089_add_virtual_environment_mode.py @@ -1,9 +1,9 @@ """Add virtual_environment_mode to the model definition.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0090_add_forward_only_column.py b/sqlmesh/migrations/v0090_add_forward_only_column.py index b68c0f65ea..48253691ec 100644 --- a/sqlmesh/migrations/v0090_add_forward_only_column.py +++ b/sqlmesh/migrations/v0090_add_forward_only_column.py @@ -7,9 +7,7 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" @@ -27,11 +25,9 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.execute(alter_table_exp) -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0091_on_additive_change.py b/sqlmesh/migrations/v0091_on_additive_change.py index c0170bd438..e24b9b4122 100644 --- a/sqlmesh/migrations/v0091_on_additive_change.py +++ b/sqlmesh/migrations/v0091_on_additive_change.py @@ -1,9 +1,9 @@ """Add on_additive_change to incremental model metadata hash.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0092_warn_about_dbt_data_type_diff.py b/sqlmesh/migrations/v0092_warn_about_dbt_data_type_diff.py index 1ff069bc82..02e2a5f4c1 100644 --- a/sqlmesh/migrations/v0092_warn_about_dbt_data_type_diff.py +++ b/sqlmesh/migrations/v0092_warn_about_dbt_data_type_diff.py @@ -17,13 +17,11 @@ SQLMESH_DBT_PACKAGE = "sqlmesh.dbt" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0093_use_raw_sql_in_fingerprint.py b/sqlmesh/migrations/v0093_use_raw_sql_in_fingerprint.py index f629c1d27d..aaaacf3a91 100644 --- a/sqlmesh/migrations/v0093_use_raw_sql_in_fingerprint.py +++ b/sqlmesh/migrations/v0093_use_raw_sql_in_fingerprint.py @@ -1,9 +1,9 @@ """Use the raw SQL when computing the model fingerprint.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0094_add_dev_version_and_fingerprint_columns.py b/sqlmesh/migrations/v0094_add_dev_version_and_fingerprint_columns.py index 1abc4fa4af..9d7adf21a3 100644 --- a/sqlmesh/migrations/v0094_add_dev_version_and_fingerprint_columns.py +++ b/sqlmesh/migrations/v0094_add_dev_version_and_fingerprint_columns.py @@ -7,9 +7,7 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" @@ -42,11 +40,9 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.execute(add_fingerprint_exp) -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0095_warn_about_dbt_raw_sql_diff.py b/sqlmesh/migrations/v0095_warn_about_dbt_raw_sql_diff.py index 802d996df5..0fa9fd51b8 100644 --- a/sqlmesh/migrations/v0095_warn_about_dbt_raw_sql_diff.py +++ b/sqlmesh/migrations/v0095_warn_about_dbt_raw_sql_diff.py @@ -17,13 +17,11 @@ SQLMESH_DBT_PACKAGE = "sqlmesh.dbt" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0096_remove_plan_dags_table.py b/sqlmesh/migrations/v0096_remove_plan_dags_table.py index e342d6b1a8..8eb674ead0 100644 --- a/sqlmesh/migrations/v0096_remove_plan_dags_table.py +++ b/sqlmesh/migrations/v0096_remove_plan_dags_table.py @@ -1,9 +1,7 @@ """Remove the obsolete _plan_dags table.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore plan_dags_table = "_plan_dags" if schema: plan_dags_table = f"{schema}.{plan_dags_table}" @@ -11,5 +9,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.drop_table(plan_dags_table) -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0097_add_dbt_name_in_node.py b/sqlmesh/migrations/v0097_add_dbt_name_in_node.py index f8909e4430..cd548977ef 100644 --- a/sqlmesh/migrations/v0097_add_dbt_name_in_node.py +++ b/sqlmesh/migrations/v0097_add_dbt_name_in_node.py @@ -1,9 +1,9 @@ """Add 'dbt_name' property to node definition.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0098_add_dbt_node_info_in_node.py b/sqlmesh/migrations/v0098_add_dbt_node_info_in_node.py index c8acd0bafd..b69ba8fa6f 100644 --- a/sqlmesh/migrations/v0098_add_dbt_node_info_in_node.py +++ b/sqlmesh/migrations/v0098_add_dbt_node_info_in_node.py @@ -5,15 +5,13 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore import pandas as pd - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema snapshots_table = "_snapshots" if schema: snapshots_table = f"{schema}.{snapshots_table}" diff --git a/sqlmesh/migrations/v0099_add_last_altered_to_intervals.py b/sqlmesh/migrations/v0099_add_last_altered_to_intervals.py index 1a119a338d..b80ed35a35 100644 --- a/sqlmesh/migrations/v0099_add_last_altered_to_intervals.py +++ b/sqlmesh/migrations/v0099_add_last_altered_to_intervals.py @@ -3,9 +3,7 @@ from sqlglot import exp -def migrate_schemas(state_sync, **kwargs): # type: ignore - engine_adapter = state_sync.engine_adapter - schema = state_sync.schema +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore intervals_table = "_intervals" if schema: intervals_table = f"{schema}.{intervals_table}" @@ -23,5 +21,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter.execute(alter_table_exp) -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass diff --git a/sqlmesh/migrations/v0100_add_grants_and_grants_target_layer.py b/sqlmesh/migrations/v0100_add_grants_and_grants_target_layer.py index fa23935da0..9ff64c5e57 100644 --- a/sqlmesh/migrations/v0100_add_grants_and_grants_target_layer.py +++ b/sqlmesh/migrations/v0100_add_grants_and_grants_target_layer.py @@ -1,9 +1,9 @@ """Add grants and grants_target_layer to incremental model metadata hash.""" -def migrate_schemas(state_sync, **kwargs): # type: ignore +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore pass -def migrate_rows(state_sync, **kwargs): # type: ignore +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore pass