Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sqlmesh/core/config/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Config(BaseConfig):
}

_connection_config_validator = connection_config_validator
_scheduler_config_validator = scheduler_config_validator
_scheduler_config_validator = scheduler_config_validator # type: ignore
_variables_validator = variables_validator

@field_validator("gateways", mode="before")
Expand Down
311 changes: 238 additions & 73 deletions sqlmesh/core/console.py

Large diffs are not rendered by default.

38 changes: 31 additions & 7 deletions sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sqlmesh.core.console import Console, get_console
from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements
from sqlmesh.core.macros import RuntimeStage
from sqlmesh.core.model.definition import AuditResult
from sqlmesh.core.node import IntervalUnit
from sqlmesh.core.notification_target import (
NotificationEvent,
Expand Down Expand Up @@ -166,7 +167,7 @@ def evaluate(
deployability_index: DeployabilityIndex,
batch_index: int,
**kwargs: t.Any,
) -> None:
) -> t.Tuple[t.List[AuditResult], t.List[AuditError]]:
"""Evaluate a snapshot and add the processed interval to the state sync.

Args:
Expand All @@ -178,6 +179,9 @@ def evaluate(
batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
auto_restatement_enabled: Whether to enable auto restatements.
kwargs: Additional kwargs to pass to the renderer.

Returns:
Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn.
"""
validate_date_range(start, end)

Expand Down Expand Up @@ -207,6 +211,7 @@ def evaluate(
)

audit_errors_to_raise: t.List[AuditError] = []
audit_errors_to_warn: t.List[AuditError] = []
for audit_result in (result for result in audit_results if result.count):
error = AuditError(
audit_name=audit_result.audit.name,
Expand All @@ -224,15 +229,13 @@ def evaluate(
if audit_result.blocking:
audit_errors_to_raise.append(error)
else:
get_console().log_warning(
f"\n{error}.",
long_message=f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}",
)
audit_errors_to_warn.append(error)

if audit_errors_to_raise:
raise NodeAuditsErrors(audit_errors_to_raise)

self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable)
return audit_results, audit_errors_to_warn

def run(
self,
Expand Down Expand Up @@ -463,21 +466,42 @@ def evaluate_node(node: SchedulingUnit) -> None:
execution_start_ts = now_timestamp()
evaluation_duration_ms: t.Optional[int] = None

audit_results: t.List[AuditResult] = []
audit_errors_to_warn: t.List[AuditError] = []
try:
assert execution_time # mypy
assert deployability_index # mypy
self.evaluate(
audit_results, audit_errors_to_warn = self.evaluate(
snapshot=snapshot,
start=start,
end=end,
execution_time=execution_time,
deployability_index=deployability_index,
batch_index=batch_idx,
)

for audit_error in audit_errors_to_warn:
display_name = snapshot.display_name(
environment_naming_info,
self.default_catalog,
self.snapshot_evaluator.adapter.dialect,
)
self.console.log_warning(
f"\n{display_name}: {audit_error}.",
f"{audit_error}. Audit query:\n{audit_error.query.sql(audit_error.adapter_dialect)}",
)

evaluation_duration_ms = now_timestamp() - execution_start_ts
finally:
num_audits = len(audit_results)
num_audits_failed = sum(1 for result in audit_results if result.count)
self.console.update_snapshot_evaluation_progress(
snapshot, batch_idx, evaluation_duration_ms
snapshot,
batched_intervals[snapshot][batch_idx],
batch_idx,
evaluation_duration_ms,
num_audits - num_audits_failed,
num_audits_failed,
)

try:
Expand Down
9 changes: 9 additions & 0 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,15 @@ def display_name(
Returns the model name as a qualified view name.
This is just used for presenting information back to the user and `qualified_view_name` should be used
when wanting a view name in all other cases.

Args:
snapshot_info_like: The snapshot info object to get the display name for
environment_naming_info: Environment naming info to use for display name formatting
default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name.
dialect: Optional dialect type to use for name formatting

Returns:
The formatted display name as a string
"""
if snapshot_info_like.is_audit:
return snapshot_info_like.name
Expand Down
54 changes: 27 additions & 27 deletions tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,22 @@ def assert_new_env(result, new_env="prod", from_env="prod", initialize=True) ->
) in result.output


def assert_model_versions_created(result) -> None:
assert "Model versions created successfully" in result.output
def assert_physical_layer_updated(result) -> None:
assert "Physical layer updated" in result.output


def assert_model_batches_executed(result) -> None:
assert "Model batches executed successfully" in result.output
assert "Model batches executed" in result.output


def assert_target_env_updated(result) -> None:
assert "Target environment updated successfully" in result.output
def assert_virtual_layer_updated(result) -> None:
assert "Virtual layer updated" in result.output


def assert_backfill_success(result) -> None:
assert_model_versions_created(result)
assert_physical_layer_updated(result)
assert_model_batches_executed(result)
assert_target_env_updated(result)
assert_virtual_layer_updated(result)


def assert_plan_success(result, new_env="prod", from_env="prod") -> None:
Expand All @@ -154,7 +154,7 @@ def assert_plan_success(result, new_env="prod", from_env="prod") -> None:


def assert_virtual_update(result) -> None:
assert "Virtual Update executed successfully" in result.output
assert "Virtual Update executed" in result.output


def test_version(runner, tmp_path):
Expand Down Expand Up @@ -242,9 +242,9 @@ def test_plan_restate_model(runner, tmp_path):
assert result.exit_code == 0
assert_duckdb_test(result)
assert "No changes to plan: project files match the `prod` environment" in result.output
assert "sqlmesh_example.full_model evaluated in" in result.output
assert "sqlmesh_example.full_model [full refresh" in result.output
assert_model_batches_executed(result)
assert_target_env_updated(result)
assert_virtual_layer_updated(result)


@pytest.mark.parametrize("flag", ["--skip-backfill", "--dry-run"])
Expand All @@ -268,7 +268,7 @@ def test_plan_skip_backfill(runner, tmp_path, flag):
)
assert result.exit_code == 0
assert_virtual_update(result)
assert "Model batches executed successfully" not in result.output
assert "Model batches executed" not in result.output


def test_plan_auto_apply(runner, tmp_path):
Expand All @@ -282,7 +282,7 @@ def test_plan_auto_apply(runner, tmp_path):

# confirm verbose output not present
assert "sqlmesh_example.seed_model created" not in result.output
assert "sqlmesh_example.seed_model promoted" not in result.output
assert "sqlmesh_example.seed_model updated" not in result.output


def test_plan_verbose(runner, tmp_path):
Expand All @@ -293,8 +293,8 @@ def test_plan_verbose(runner, tmp_path):
cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "--verbose"], input="y\n"
)
assert_plan_success(result)
assert "sqlmesh_example.seed_model created" in result.output
assert "sqlmesh_example.seed_model promoted" in result.output
assert "sqlmesh_example.seed_model created" in result.output
assert "sqlmesh_example.seed_model promoted" in result.output


def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path):
Expand Down Expand Up @@ -396,7 +396,7 @@ def test_plan_dev_create_from_virtual(runner, tmp_path):
)
assert result.exit_code == 0
assert_new_env(result, "dev2", "dev", initialize=False)
assert_target_env_updated(result)
assert_virtual_layer_updated(result)
assert_virtual_update(result)


Expand Down Expand Up @@ -495,9 +495,9 @@ def test_plan_dev_no_prompts(runner, tmp_path):
cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "dev", "--no-prompts"]
)
assert "Apply - Backfill Tables [y/n]: " in result.output
assert "Model versions created successfully" not in result.output
assert "Model batches executed successfully" not in result.output
assert "The target environment has been updated successfully" not in result.output
assert "Physical layer updated" not in result.output
assert "Model batches executed" not in result.output
assert "The target environment has been updated" not in result.output


def test_plan_dev_auto_apply(runner, tmp_path):
Expand Down Expand Up @@ -533,7 +533,7 @@ def test_plan_dev_no_changes(runner, tmp_path):
)
assert result.exit_code == 0
assert_new_env(result, "dev", initialize=False)
assert_target_env_updated(result)
assert_virtual_layer_updated(result)
assert_virtual_update(result)


Expand All @@ -552,8 +552,8 @@ def test_plan_nonbreaking(runner, tmp_path):
assert "+ 'a' AS new_col" in result.output
assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output
assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output
assert "sqlmesh_example.incremental_model evaluated in" in result.output
assert "sqlmesh_example.full_model evaluated in" not in result.output
assert "sqlmesh_example.incremental_model [insert" in result.output
assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output
assert_backfill_success(result)


Expand Down Expand Up @@ -610,8 +610,8 @@ def test_plan_breaking(runner, tmp_path):
assert result.exit_code == 0
assert "+ item_id + 1 AS item_id," in result.output
assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output
assert "sqlmesh_example.full_model evaluated in" in result.output
assert "sqlmesh_example.incremental_model evaluated in" not in result.output
assert "sqlmesh_example.full_model [full refresh" in result.output
assert "sqlmesh_example.incremental_model [insert" not in result.output
assert_backfill_success(result)


Expand Down Expand Up @@ -649,8 +649,8 @@ def test_plan_dev_select(runner, tmp_path):
assert "+ item_id + 1 AS item_id," not in result.output
assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output
# only incremental_model backfilled
assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output
assert "sqlmesh_example__dev.full_model evaluated in" not in result.output
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
assert_backfill_success(result)


Expand Down Expand Up @@ -688,8 +688,8 @@ def test_plan_dev_backfill(runner, tmp_path):
"Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output
)
# only incremental_model backfilled
assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output
assert "sqlmesh_example__dev.full_model evaluated in" not in result.output
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
assert_backfill_success(result)


Expand Down
6 changes: 4 additions & 2 deletions tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,9 @@ def test_override_builtin_audit_blocking_mode():
plan = context.plan(auto_apply=True, no_prompts=True)
new_snapshot = next(iter(plan.context_diff.new_snapshots.values()))

assert mock_logger.call_args_list[0][0][0] == "\n'not_null' audit error: 1 row failed."
assert (
mock_logger.call_args_list[0][0][0] == "\ndb.x: 'not_null' audit error: 1 row failed."
)

# Even though there are two builtin audits referenced in the above definition, we only
# store the one that overrides `blocking` in the snapshot; the other one isn't needed
Expand Down Expand Up @@ -1401,7 +1403,7 @@ def test_plan_runs_audits_on_dev_previews(sushi_context: Context, capsys, caplog
log = caplog.text
assert "'not_null' audit error:" in log
assert "'at_least_one_non_blocking' audit error:" in log
assert "Target environment updated successfully" in stdout
assert "Virtual layer updated" in stdout


def test_environment_statements(tmp_path: pathlib.Path):
Expand Down
12 changes: 6 additions & 6 deletions tests/integrations/jupyter/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,22 +291,22 @@ def test_plan(

# TODO: Should this be going to stdout? This is printing the status updates for when each batch finishes for
# the models and how long it took
assert len(output.stdout.strip().split("\n")) == 24
assert len(output.stdout.strip().split("\n")) == 46
assert not output.stderr
assert len(output.outputs) == 4
text_output = convert_all_html_output_to_text(output)
# TODO: Is this what we expect?
# This has minor differences between CI/CD and local.
assert "[2K" in text_output[0]
assert text_output[1].startswith(
"Virtually Updating 'prod' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%"
"Updating virtual layer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%"
)
# TODO: Is this what we expect?
assert text_output[2] == ""
assert text_output[3] == "Target environment updated successfully"
assert text_output[3] == "✔ Virtual layer updated"
assert convert_all_html_output_to_tags(output) == [
["pre", "span"],
["pre"] + ["span"] * 4,
["pre"] + ["span"] * 5,
["pre"],
["pre", "span"],
]
Expand All @@ -326,7 +326,7 @@ def test_run_dag(
assert not output.stderr
assert len(output.outputs) == 2
assert convert_all_html_output_to_text(output) == [
"Model batches executed successfully",
"Model batches executed",
"Run finished for environment 'prod'",
]
assert get_all_html_output(output) == [
Expand All @@ -337,7 +337,7 @@ def test_run_dag(
h(
"span",
{"style": SUCCESS_STYLE},
"Model batches executed successfully",
"Model batches executed",
autoescape=False,
),
autoescape=False,
Expand Down
14 changes: 10 additions & 4 deletions web/server/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from fastapi.encoders import jsonable_encoder
from sse_starlette.sse import ServerSentEvent

from sqlmesh.core.snapshot.definition import Interval
from sqlmesh.core.console import TerminalConsole
from sqlmesh.core.environment import EnvironmentNamingInfo
from sqlmesh.core.plan.definition import EvaluatablePlan
Expand Down Expand Up @@ -91,7 +91,7 @@ def stop_restate_progress(self, success: bool) -> None:

def start_evaluation_progress(
self,
batches: t.Dict[Snapshot, int],
batch_sizes: t.Dict[Snapshot, int],
environment_naming_info: EnvironmentNamingInfo,
default_catalog: t.Optional[str],
) -> None:
Expand All @@ -104,7 +104,7 @@ def start_evaluation_progress(
name=snapshot.name,
view_name=snapshot.display_name(environment_naming_info, default_catalog),
)
for snapshot, total_tasks in batches.items()
for snapshot, total_tasks in batch_sizes.items()
}
self.plan_apply_stage_tracker.add_stage(
models.PlanStage.backfill,
Expand All @@ -123,7 +123,13 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
self.log_event_plan_apply()

def update_snapshot_evaluation_progress(
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
self,
snapshot: Snapshot,
interval: Interval,
batch_idx: int,
duration_ms: t.Optional[int],
num_audits_passed: int,
num_audits_failed: int,
) -> None:
if self.plan_apply_stage_tracker and self.plan_apply_stage_tracker.backfill:
task = self.plan_apply_stage_tracker.backfill.tasks[snapshot.name]
Expand Down