Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sqlmesh/core/state_sync/db/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def migrate(
) -> None:
"""Migrate the state sync to the latest SQLMesh / SQLGlot version."""
self.migrator.migrate(
self,
self.schema,
skip_backup=skip_backup,
promoted_snapshots_only=promoted_snapshots_only,
)
Expand Down
11 changes: 5 additions & 6 deletions sqlmesh/core/state_sync/db/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
MIN_SCHEMA_VERSION,
MIN_SQLMESH_VERSION,
)
from sqlmesh.core.state_sync.base import StateSync
from sqlmesh.core.state_sync.db.environment import EnvironmentState
from sqlmesh.core.state_sync.db.interval import IntervalState
from sqlmesh.core.state_sync.db.snapshot import SnapshotState
Expand Down Expand Up @@ -85,7 +84,7 @@ def __init__(

def migrate(
self,
state_sync: StateSync,
schema: t.Optional[str],
skip_backup: bool = False,
promoted_snapshots_only: bool = True,
) -> None:
Expand All @@ -94,7 +93,7 @@ def migrate(
migration_start_ts = time.perf_counter()

try:
migrate_rows = self._apply_migrations(state_sync, skip_backup)
migrate_rows = self._apply_migrations(schema, skip_backup)

if not migrate_rows and major_minor(SQLMESH_VERSION) == versions.minor_sqlmesh_version:
return
Expand Down Expand Up @@ -153,7 +152,7 @@ def rollback(self) -> None:

def _apply_migrations(
self,
state_sync: StateSync,
schema: t.Optional[str],
skip_backup: bool,
) -> bool:
versions = self.version_state.get_versions()
Expand Down Expand Up @@ -184,10 +183,10 @@ def _apply_migrations(

for migration in migrations:
logger.info(f"Applying migration {migration}")
migration.migrate_schemas(state_sync)
migration.migrate_schemas(engine_adapter=self.engine_adapter, schema=schema)
if state_table_exist:
# No need to run DML for the initial migration since all tables are empty
migration.migrate_rows(state_sync)
migration.migrate_rows(engine_adapter=self.engine_adapter, schema=schema)

snapshot_count_after = self.snapshot_state.count()

Expand Down
9 changes: 3 additions & 6 deletions sqlmesh/migrations/v0000_baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
from sqlmesh.utils.migration import blob_text_type, index_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
schema = state_sync.schema
engine_adapter = state_sync.engine_adapter

def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
intervals_table = "_intervals"
snapshots_table = "_snapshots"
environments_table = "_environments"
versions_table = "_versions"
if state_sync.schema:
if schema:
engine_adapter.create_schema(schema)
intervals_table = f"{schema}.{intervals_table}"
snapshots_table = f"{schema}.{snapshots_table}"
Expand Down Expand Up @@ -94,5 +91,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter.create_index(intervals_table, "_intervals_name_version_idx", ("name", "version"))


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
7 changes: 2 additions & 5 deletions sqlmesh/migrations/v0061_mysql_fix_blob_text_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@
from sqlmesh.utils.migration import blob_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
if engine_adapter.dialect != "mysql":
return

schema = state_sync.schema
environments_table = "_environments"
snapshots_table = "_snapshots"

Expand Down Expand Up @@ -46,5 +43,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter.execute(alter_table_exp)


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
4 changes: 2 additions & 2 deletions sqlmesh/migrations/v0062_add_model_gateway.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Add the gateway model attribute."""


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
6 changes: 2 additions & 4 deletions sqlmesh/migrations/v0063_change_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
from sqlmesh.utils.migration import index_text_type, blob_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
index_type = index_text_type(engine_adapter.dialect)
if schema:
Expand Down
6 changes: 2 additions & 4 deletions sqlmesh/migrations/v0064_join_when_matched_strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
from sqlmesh.utils.migration import index_text_type, blob_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
index_type = index_text_type(engine_adapter.dialect)
if schema:
Expand Down
4 changes: 2 additions & 2 deletions sqlmesh/migrations/v0065_add_model_optimize.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Add the optimize_query model attribute."""


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
8 changes: 2 additions & 6 deletions sqlmesh/migrations/v0066_add_auto_restatements.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
auto_restatements_table = "_auto_restatements"
intervals_table = "_intervals"

Expand Down Expand Up @@ -40,9 +38,7 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter.execute(alter_table_exp)


def migrate_rows(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
intervals_table = "_intervals"

if schema:
Expand Down
4 changes: 2 additions & 2 deletions sqlmesh/migrations/v0067_add_tsql_date_full_precision.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Add full precision for tsql to support nanoseconds."""


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Include the unrendered query in the metadata hash."""


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
6 changes: 2 additions & 4 deletions sqlmesh/migrations/v0069_update_dev_table_suffix.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
from sqlmesh.utils.migration import index_text_type, blob_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
environments_table = "_environments"
if schema:
Expand Down
4 changes: 2 additions & 2 deletions sqlmesh/migrations/v0070_include_grains_in_metadata_hash.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Include grains in the metadata hash."""


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
8 changes: 2 additions & 6 deletions sqlmesh/migrations/v0071_add_dev_version_to_intervals.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
from sqlmesh.utils.migration import index_text_type, blob_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
intervals_table = "_intervals"
if schema:
intervals_table = f"{schema}.{intervals_table}"
Expand All @@ -29,9 +27,7 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter.execute(alter_table_exp)


def migrate_rows(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
intervals_table = "_intervals"
snapshots_table = "_snapshots"
if schema:
Expand Down
6 changes: 2 additions & 4 deletions sqlmesh/migrations/v0072_add_environment_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from sqlmesh.utils.migration import blob_text_type, index_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
environment_statements_table = "_environment_statements"

if schema:
Expand All @@ -27,5 +25,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
)


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
from sqlmesh.utils.migration import index_text_type, blob_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
(default: True to keep the original behaviour)"""


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
6 changes: 2 additions & 4 deletions sqlmesh/migrations/v0075_remove_validate_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@
from sqlmesh.utils.migration import blob_text_type


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
index_type = index_text_type(engine_adapter.dialect)
if schema:
Expand Down
4 changes: 2 additions & 2 deletions sqlmesh/migrations/v0076_add_cron_tz.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Add 'cron_tz' property to node definition."""


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
4 changes: 2 additions & 2 deletions sqlmesh/migrations/v0077_fix_column_type_hash_calculation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Use the model's dialect when calculating the hash for the column types."""


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
pass
6 changes: 2 additions & 4 deletions sqlmesh/migrations/v0078_warn_if_non_migratable_python_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
from sqlmesh.core.console import get_console


def migrate_schemas(state_sync, **kwargs): # type: ignore
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"
Expand Down
16 changes: 7 additions & 9 deletions sqlmesh/migrations/v0079_add_gateway_managed_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
from sqlglot import exp


def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
environments_table = "_environments"
if state_sync.schema:
environments_table = f"{state_sync.schema}.{environments_table}"
if schema:
environments_table = f"{schema}.{environments_table}"

alter_table_exp = exp.Alter(
this=exp.to_table(environments_table),
Expand All @@ -22,13 +21,12 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter.execute(alter_table_exp)


def migrate_rows(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
environments_table = "_environments"
if state_sync.schema:
environments_table = f"{state_sync.schema}.{environments_table}"
if schema:
environments_table = f"{schema}.{environments_table}"

state_sync.engine_adapter.update_table(
engine_adapter.update_table(
environments_table,
{"gateway_managed": False},
where=exp.true(),
Expand Down
Loading