From 90994942f4dcb382818ba5beb0157251f7afeb93 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Fri, 14 Mar 2025 13:51:11 -0500 Subject: [PATCH 01/23] Overhaul CLI evaluation progress bar --- sqlmesh/core/config/root.py | 2 +- sqlmesh/core/console.py | 245 +++++++++++++++++++--- sqlmesh/core/scheduler.py | 14 +- tests/cli/test_cli.py | 46 ++-- tests/core/test_context.py | 6 +- tests/core/test_scheduler.py | 2 + tests/integrations/jupyter/test_magics.py | 6 +- web/server/console.py | 5 +- 8 files changed, 263 insertions(+), 63 deletions(-) diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index c1642ccd43..9b2250ab1a 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -147,7 +147,7 @@ class Config(BaseConfig): } _connection_config_validator = connection_config_validator - _scheduler_config_validator = scheduler_config_validator + _scheduler_config_validator = scheduler_config_validator # type: ignore _variables_validator = variables_validator @field_validator("gateways", mode="before") diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index c1dd42d78e..03f9755ace 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -24,21 +24,24 @@ from rich.syntax import Syntax from rich.table import Table from rich.tree import Tree +from sqlglot import exp from sqlmesh.core.environment import EnvironmentNamingInfo from sqlmesh.core.linter.rule import RuleViolation from sqlmesh.core.model import Model +from sqlmesh.core.model.definition import AuditResult from sqlmesh.core.snapshot import ( Snapshot, SnapshotChangeCategory, SnapshotId, SnapshotInfoLike, ) +from sqlmesh.core.snapshot.definition import Interval, Intervals from sqlmesh.core.test import ModelTest from sqlmesh.utils import rich as srich from sqlmesh.utils import Verbosity from sqlmesh.utils.concurrency import NodeExecutionFailedError -from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds +from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds, to_ds, to_tstz from sqlmesh.utils.errors import ( PythonModelEvalError, NodeAuditsErrors, @@ -72,6 +75,8 @@ PROGRESS_BAR_WIDTH = 40 LINE_WRAP_WIDTH = 100 +GREEN_CHECK_MARK = "[green]\u2714[/green]" +RED_X_MARK = "\u274c" class Console(abc.ABC): @@ -88,10 +93,16 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None: def stop_plan_evaluation(self) -> None: """Indicates that the evaluation has ended.""" + @abc.abstractmethod + def store_evaluation_audit_results( + self, snapshot: Snapshot, audit_results: t.List[AuditResult] + ) -> None: + """Stores the audit results for the snapshot evaluation.""" + @abc.abstractmethod def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batches: t.Dict[Snapshot, Intervals], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -339,9 +350,14 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None: def stop_plan_evaluation(self) -> None: pass + def store_evaluation_audit_results( + self, snapshot: Snapshot, audit_results: t.List[AuditResult] + ) -> None: + pass + def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batches: t.Dict[Snapshot, Intervals], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -532,7 +548,10 @@ def __init__( self.evaluation_total_task: t.Optional[TaskID] = None self.evaluation_model_progress: t.Optional[Progress] = None self.evaluation_model_tasks: t.Dict[str, TaskID] = {} - self.evaluation_model_batches: t.Dict[Snapshot, int] = {} + self.evaluation_model_batch_sizes: t.Dict[Snapshot, int] = {} + self.evaluation_model_info: t.Dict[Snapshot, t.Dict[str, t.Any]] = {} + self.evaluation_model_column_widths: t.Dict[str, int] = {} + self.evaluation_audit_results: t.Dict[Snapshot, t.List[AuditResult]] = {} # Put in temporary values that are replaced when evaluating self.environment_naming_info = EnvironmentNamingInfo() @@ -571,15 +590,38 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None: def stop_plan_evaluation(self) -> None: pass + def store_evaluation_audit_results( + self, snapshot: Snapshot, audit_results: t.List[AuditResult] + ) -> None: + self.evaluation_audit_results[snapshot] = audit_results + def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batched_intervals: t.Dict[Snapshot, Intervals], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: """Indicates that a new snapshot evaluation progress has begun.""" if not self.evaluation_progress_live: - self.evaluation_total_progress = make_progress_bar("Evaluating models", self.console) + self.evaluation_model_batch_sizes = { + snapshot: len(intervals) for snapshot, intervals in batched_intervals.items() + } + self.environment_naming_info = environment_naming_info + self.default_catalog = default_catalog + + self.evaluation_model_info, self.evaluation_model_column_widths = ( + _create_evaluation_model_info( + batched_intervals, + self.evaluation_model_batch_sizes, + environment_naming_info, + default_catalog, + self.dialect, + ) + ) + + self.evaluation_total_progress = make_progress_bar( + "Evaluating model batches", self.console + ) self.evaluation_model_progress = Progress( TextColumn("{task.fields[view_name]}", justify="right"), @@ -595,13 +637,9 @@ def start_evaluation_progress( self.evaluation_progress_live.start() self.evaluation_total_task = self.evaluation_total_progress.add_task( - "Evaluating models...", total=sum(batches.values()) + "Evaluating models...", total=sum(self.evaluation_model_batch_sizes.values()) ) - self.evaluation_model_batches = batches - self.environment_naming_info = environment_naming_info - self.default_catalog = default_catalog - def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks: display_name = snapshot.display_name( @@ -610,7 +648,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task( f"Evaluating {display_name}...", view_name=display_name, - total=self.evaluation_model_batches[snapshot], + total=self.evaluation_model_batch_sizes[snapshot], ) def update_snapshot_evaluation_progress( @@ -622,11 +660,39 @@ def update_snapshot_evaluation_progress( and self.evaluation_model_progress and self.evaluation_progress_live ): - total_batches = self.evaluation_model_batches[snapshot] + total_batches = self.evaluation_model_batch_sizes[snapshot] + batch_num = str(batch_idx + 1).rjust(len(str(total_batches))) + batch = f"[{batch_num}/{total_batches}] " if duration_ms: + display_name = self.evaluation_model_info[snapshot]["display_name"].ljust( + self.evaluation_model_column_widths["display_name"] + ) + + annotation = self.evaluation_model_info[snapshot]["annotation"][batch_idx] + num_audits = 0 + num_audits_passed = 0 + if snapshot in self.evaluation_audit_results: + num_audits = len(self.evaluation_audit_results[snapshot]) + num_audits_passed = sum( + result.count == 0 for result in self.evaluation_audit_results[snapshot] + ) + num_audits_failed = num_audits - num_audits_passed + if num_audits_passed: + annotation += f", {num_audits_passed} audits pass" + if num_audits_failed: + annotation += f", {num_audits_failed} audits fail {RED_X_MARK}" + annotation = (annotation + "]").ljust( + self.evaluation_model_column_widths["annotation"] + ) + + # 8 characters for duration + # if the failed audit red X is present, the console adds an extra space + duration_width = 7 if num_audits_failed else 8 + duration = f"{(duration_ms / 1000.0):.2f}s".rjust(duration_width) + self.evaluation_progress_live.console.print( - f"[{batch_idx + 1}/{total_batches}] {snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} [green]evaluated[/green] in {(duration_ms / 1000.0):.2f}s" + f"{GREEN_CHECK_MARK} {batch}{display_name}{annotation} {duration}" ) self.evaluation_total_progress.update( @@ -643,14 +709,17 @@ def stop_evaluation_progress(self, success: bool = True) -> None: if self.evaluation_progress_live: self.evaluation_progress_live.stop() if success: - self.log_success("Model batches executed successfully") + self.log_success("Model batches evaluated successfully") self.evaluation_progress_live = None self.evaluation_total_progress = None self.evaluation_total_task = None self.evaluation_model_progress = None self.evaluation_model_tasks = {} - self.evaluation_model_batches = {} + self.evaluation_model_batch_sizes = {} + self.evaluation_model_info = {} + self.evaluation_model_column_widths = {} + self.evaluation_audit_results = {} self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -665,6 +734,7 @@ def start_creation_progress( message = "Creating physical table" if total_tasks == 1 else "Creating physical tables" self.creation_progress = make_progress_bar(message, self.console) + self._print("") self.creation_progress.start() self.creation_task = self.creation_progress.add_task( "Creating physical tables...", @@ -679,7 +749,7 @@ def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: if self.creation_progress is not None and self.creation_task is not None: if self.verbosity >= Verbosity.VERBOSE: self.creation_progress.live.console.print( - f"{snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} [green]created[/green]" + f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} [green]created[/green]" ) self.creation_progress.update(self.creation_task, refresh=True, advance=1) @@ -690,7 +760,7 @@ def stop_creation_progress(self, success: bool = True) -> None: self.creation_progress.stop() self.creation_progress = None if success: - self.log_success("Model versions created successfully") + self.log_success("\nModel versions created successfully") self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -731,7 +801,7 @@ def start_promotion_progress( if self.promotion_progress is None: self.promotion_progress = Progress( TextColumn( - f"[bold blue]Virtually Updating '{environment_naming_info.name}'", + f"[bold blue]Virtually updating '{environment_naming_info.name}' environment views", justify="right", ), BarColumn(bar_width=PROGRESS_BAR_WIDTH), @@ -743,7 +813,7 @@ def start_promotion_progress( self.promotion_progress.start() self.promotion_task = self.promotion_progress.add_task( - f"Virtually Updating {environment_naming_info.name}...", + f"Virtually updating {environment_naming_info.name}...", total=total_tasks, ) @@ -756,7 +826,7 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) if self.verbosity >= Verbosity.VERBOSE: action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" self.promotion_progress.live.console.print( - f"{snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} {action_str}" + f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} {action_str}" ) self.promotion_progress.update(self.promotion_task, refresh=True, advance=1) @@ -767,7 +837,7 @@ def stop_promotion_progress(self, success: bool = True) -> None: self.promotion_progress.stop() self.promotion_progress = None if success: - self.log_success("Target environment updated successfully") + self.log_success("\nEnvironment views updated successfully") self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -1298,7 +1368,7 @@ def log_warning(self, short_message: str, long_message: t.Optional[str] = None) self._print(message_formatted) def log_success(self, message: str) -> None: - self._print(f"\n[green]{message}[/green]\n") + self._print(f"[green]{message}[/green]\n") def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: id = uuid.uuid4() @@ -2243,11 +2313,13 @@ def _confirm(self, message: str, **kwargs: t.Any) -> bool: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batched_intervals: t.Dict[Snapshot, Intervals], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: - self.evaluation_batches = batches + self.evaluation_model_batch_sizes = { + snapshot: len(intervals) for snapshot, intervals in batched_intervals.items() + } self.evaluation_environment_naming_info = environment_naming_info self.default_catalog = default_catalog @@ -2257,13 +2329,15 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: self.evaluation_environment_naming_info, self.default_catalog, dialect=self.dialect ) self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0) - print(f"Starting '{display_name}', Total batches: {self.evaluation_batches[snapshot]}") + print( + f"Starting '{display_name}', Total batches: {self.evaluation_model_batch_sizes[snapshot]}" + ) def update_snapshot_evaluation_progress( self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] ) -> None: view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id] - total_batches = self.evaluation_batches[snapshot] + total_batches = self.evaluation_model_batch_sizes[snapshot] loaded_batches += 1 self.evaluation_batch_progress[snapshot.snapshot_id] = (view_name, loaded_batches) @@ -2275,7 +2349,7 @@ def update_snapshot_evaluation_progress( total_finished_loading = len( [ s - for s, total in self.evaluation_batches.items() + for s, total in self.evaluation_model_batch_sizes.items() if self.evaluation_batch_progress.get(s.snapshot_id, (None, -1))[1] == total ] ) @@ -2401,11 +2475,11 @@ def stop_plan_evaluation(self) -> None: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batched_intervals: t.Dict[Snapshot, Intervals], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: - self._write(f"Starting evaluation for {len(batches)} snapshots") + self._write(f"Starting evaluation for {len(batched_intervals)} snapshots") def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: self._write(f"Evaluating {snapshot.name}") @@ -2643,3 +2717,114 @@ def _format_audits_errors(error: NodeAuditsErrors) -> str: msg = msg.replace("\n", "\n ") error_messages.append(msg) return " " + "\n".join(error_messages) + + +def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) -> str: + if snapshot.is_model: + # only include time if interval < 1 day + fmt_func = ( + to_ds + if (interval[1] - interval[0]) >= datetime.timedelta(days=1).total_seconds() + else to_tstz + ) + return ( + f"insert {fmt_func(interval[0])} - {fmt_func(interval[1])}" + if snapshot.model.kind.is_incremental + or snapshot.model.kind.is_managed + or snapshot.model.kind.is_custom + else "" + ) + return "" + + +def _create_evaluation_model_info( + batched_intervals: t.Dict[Snapshot, Intervals], + model_batch_sizes: t.Dict[Snapshot, int], + environment_naming_info: EnvironmentNamingInfo, + default_catalog: t.Optional[str], + dialect: t.Optional[DialectType], +) -> t.Tuple[t.Dict[Snapshot, t.Dict[str, t.Any]], t.Dict[str, int]]: + """Creates model information dictionaries for model evaluation progress bar. + + Parameters: + batched_intervals: Dictionary mapping snapshot batches to their evaluation intervals + model_batch_sizes: Dictionary mapping snapshots to their batch sizes + environment_naming_info: Information about environment naming, needed to render model name + default_catalog: Optional default catalog name for rendering model name + dialect: Optional SQL dialect for rendering model name + + Returns: + A tuple containing: + - Dictionary mapping snapshots to their model's display information + - Dictionary of output field names to column widths + """ + model_info: t.Dict[Snapshot, t.Dict[str, t.Any]] = {} + model_column_widths = {} + model_column_widths["display_name"] = 0 + model_column_widths["annotation"] = 0 + + for snapshot in batched_intervals: + model_info[snapshot] = {} + model_info[snapshot]["display_name"] = snapshot.display_name( + environment_naming_info, default_catalog, dialect=dialect + ) + model_column_widths["display_name"] = max( + model_column_widths["display_name"], len(model_info[snapshot]["display_name"]) + ) + + # The annotation includes audit results. We cannot build the audits result string + # until after evaluation occurs, but we must determine the annotation column width here. + # Therefore, we add enough padding for the longest possible audits result string. + audit_pad = 0 + if snapshot.is_model and snapshot.model.audits: + num_audits = len(snapshot.model.audits_with_args) + num_nonblocking_audits = sum( + 1 + for audit in snapshot.model.audits_with_args + if not audit[0].blocking + or ("blocking" in audit[1] and audit[1]["blocking"] == exp.false()) + ) + # make enough room for all audits to pass + audit_pad = len(f", {str(num_audits)} audits passed") + if num_nonblocking_audits: + # and add enough room for all nonblocking audits to fail + audit_pad += len(f", {str(num_nonblocking_audits)} audits failed X") # red X + audit_pad += 1 # closing bracket + + model_info[snapshot]["annotation"] = [ + _create_evaluation_model_annotation( + snapshot, + _format_evaluation_model_interval(snapshot, interval), + ) + for interval in batched_intervals[snapshot] + ] + model_column_widths["annotation"] = max( + model_column_widths["annotation"], + max(len(annotation) for annotation in model_info[snapshot]["annotation"]) + audit_pad, + ) + + model_column_widths["batch"] = 5 # number characters in default "[1/1]" + # do we need space for more than one digit? + if any(size > 9 for size in model_batch_sizes.values()): + model_column_widths["batch"] = ( + max(len(str(size)) for size in model_batch_sizes.values()) * 2 + ) + 3 # brackets and slash + + return model_info, model_column_widths + + +def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str: + if snapshot.is_audit or (snapshot.is_model and snapshot.model.kind.is_external): + return " \[run audits" + if snapshot.model.kind.is_seed: + return " \[insert from seed file" + if snapshot.model.kind.is_full: + return " \[full refresh" + if snapshot.model.kind.is_view: + return " \[recreate view" + if snapshot.model.kind.is_incremental_by_unique_key: + return " \[insert or update rows" + if snapshot.model.kind.is_incremental_by_partition: + return " \[insert partition" + + return f" \[{interval_info}" if interval_info else "" diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 6073a9f98d..bbd6629d18 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -165,6 +165,8 @@ def evaluate( execution_time: TimeLike, deployability_index: DeployabilityIndex, batch_index: int, + environment_naming_info: EnvironmentNamingInfo, + default_catalog: t.Optional[str], **kwargs: t.Any, ) -> None: """Evaluate a snapshot and add the processed interval to the state sync. @@ -205,6 +207,7 @@ def evaluate( wap_id=wap_id, **kwargs, ) + get_console().store_evaluation_audit_results(snapshot, audit_results) audit_errors_to_raise: t.List[AuditError] = [] for audit_result in (result for result in audit_results if result.count): @@ -224,8 +227,13 @@ def evaluate( if audit_result.blocking: audit_errors_to_raise.append(error) else: + display_name = snapshot.display_name( + environment_naming_info, + default_catalog, + self.snapshot_evaluator.adapter.dialect, + ) get_console().log_warning( - f"\n{error}.", + f"\n{display_name}: {error}.", long_message=f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}", ) @@ -424,7 +432,7 @@ def run_merged_intervals( batched_intervals = self.batch_intervals(merged_intervals) self.console.start_evaluation_progress( - {snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()}, + batched_intervals, environment_naming_info, self.default_catalog, ) @@ -473,6 +481,8 @@ def evaluate_node(node: SchedulingUnit) -> None: execution_time=execution_time, deployability_index=deployability_index, batch_index=batch_idx, + environment_naming_info=environment_naming_info, + default_catalog=self.default_catalog, ) evaluation_duration_ms = now_timestamp() - execution_start_ts finally: diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 1a9ec2cab8..15d342b546 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -132,18 +132,18 @@ def assert_model_versions_created(result) -> None: assert "Model versions created successfully" in result.output -def assert_model_batches_executed(result) -> None: - assert "Model batches executed successfully" in result.output +def assert_model_batches_evaluated(result) -> None: + assert "Model batches evaluated successfully" in result.output -def assert_target_env_updated(result) -> None: - assert "Target environment updated successfully" in result.output +def assert_env_views_updated(result) -> None: + assert "Environment views updated successfully" in result.output def assert_backfill_success(result) -> None: assert_model_versions_created(result) - assert_model_batches_executed(result) - assert_target_env_updated(result) + assert_model_batches_evaluated(result) + assert_env_views_updated(result) def assert_plan_success(result, new_env="prod", from_env="prod") -> None: @@ -242,9 +242,9 @@ def test_plan_restate_model(runner, tmp_path): assert result.exit_code == 0 assert_duckdb_test(result) assert "No changes to plan: project files match the `prod` environment" in result.output - assert "sqlmesh_example.full_model evaluated in" in result.output - assert_model_batches_executed(result) - assert_target_env_updated(result) + assert "sqlmesh_example.full_model [full refresh" in result.output + assert_model_batches_evaluated(result) + assert_env_views_updated(result) @pytest.mark.parametrize("flag", ["--skip-backfill", "--dry-run"]) @@ -282,7 +282,7 @@ def test_plan_auto_apply(runner, tmp_path): # confirm verbose output not present assert "sqlmesh_example.seed_model created" not in result.output - assert "sqlmesh_example.seed_model promoted" not in result.output + assert "sqlmesh_example.seed_model updated" not in result.output def test_plan_verbose(runner, tmp_path): @@ -294,7 +294,7 @@ def test_plan_verbose(runner, tmp_path): ) assert_plan_success(result) assert "sqlmesh_example.seed_model created" in result.output - assert "sqlmesh_example.seed_model promoted" in result.output + assert "sqlmesh_example.seed_model updated" in result.output def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path): @@ -396,7 +396,7 @@ def test_plan_dev_create_from_virtual(runner, tmp_path): ) assert result.exit_code == 0 assert_new_env(result, "dev2", "dev", initialize=False) - assert_target_env_updated(result) + assert_env_views_updated(result) assert_virtual_update(result) @@ -533,7 +533,7 @@ def test_plan_dev_no_changes(runner, tmp_path): ) assert result.exit_code == 0 assert_new_env(result, "dev", initialize=False) - assert_target_env_updated(result) + assert_env_views_updated(result) assert_virtual_update(result) @@ -552,8 +552,8 @@ def test_plan_nonbreaking(runner, tmp_path): assert "+ 'a' AS new_col" in result.output assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output - assert "sqlmesh_example.incremental_model evaluated in" in result.output - assert "sqlmesh_example.full_model evaluated in" not in result.output + assert "sqlmesh_example.incremental_model [insert" in result.output + assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output assert_backfill_success(result) @@ -610,8 +610,8 @@ def test_plan_breaking(runner, tmp_path): assert result.exit_code == 0 assert "+ item_id + 1 AS item_id," in result.output assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output - assert "sqlmesh_example.full_model evaluated in" in result.output - assert "sqlmesh_example.incremental_model evaluated in" not in result.output + assert "sqlmesh_example.full_model [full refresh" in result.output + assert "sqlmesh_example.incremental_model [insert" not in result.output assert_backfill_success(result) @@ -649,8 +649,8 @@ def test_plan_dev_select(runner, tmp_path): assert "+ item_id + 1 AS item_id," not in result.output assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output # only incremental_model backfilled - assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output - assert "sqlmesh_example__dev.full_model evaluated in" not in result.output + assert "sqlmesh_example__dev.incremental_model [insert" in result.output + assert "sqlmesh_example__dev.full_model [full refresh" not in result.output assert_backfill_success(result) @@ -688,8 +688,8 @@ def test_plan_dev_backfill(runner, tmp_path): "Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output ) # only incremental_model backfilled - assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output - assert "sqlmesh_example__dev.full_model evaluated in" not in result.output + assert "sqlmesh_example__dev.incremental_model [insert" in result.output + assert "sqlmesh_example__dev.full_model [full refresh" not in result.output assert_backfill_success(result) @@ -718,7 +718,7 @@ def test_run_dev(runner, tmp_path, flag): # Confirm backfill occurs when we run non-backfilled dev env result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run", "dev"]) assert result.exit_code == 0 - assert_model_batches_executed(result) + assert_model_batches_evaluated(result) @time_machine.travel(FREEZE_TIME) @@ -750,7 +750,7 @@ def test_run_cron_elapsed(runner, tmp_path): result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run"]) assert result.exit_code == 0 - assert_model_batches_executed(result) + assert_model_batches_evaluated(result) def test_clean(runner, tmp_path): diff --git a/tests/core/test_context.py b/tests/core/test_context.py index b2c5090269..860421fb46 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -470,7 +470,9 @@ def test_override_builtin_audit_blocking_mode(): plan = context.plan(auto_apply=True, no_prompts=True) new_snapshot = next(iter(plan.context_diff.new_snapshots.values())) - assert mock_logger.call_args_list[0][0][0] == "\n'not_null' audit error: 1 row failed." + assert ( + mock_logger.call_args_list[0][0][0] == "\ndb.x: 'not_null' audit error: 1 row failed." + ) # Even though there are two builtin audits referenced in the above definition, we only # store the one that overrides `blocking` in the snapshot; the other one isn't needed @@ -1401,7 +1403,7 @@ def test_plan_runs_audits_on_dev_previews(sushi_context: Context, capsys, caplog log = caplog.text assert "'not_null' audit error:" in log assert "'at_least_one_non_blocking' audit error:" in log - assert "Target environment updated successfully" in stdout + assert "Environment views updated successfully" in stdout def test_environment_statements(tmp_path: pathlib.Path): diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index cfe3bf52bb..dbe62f8a44 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -528,6 +528,8 @@ def _evaluate(): to_datetime("2022-01-03"), DeployabilityIndex.all_deployable(), 0, + EnvironmentNamingInfo(), + None, ) evaluator_audit_mock.return_value = [ diff --git a/tests/integrations/jupyter/test_magics.py b/tests/integrations/jupyter/test_magics.py index 95ce610286..4f84c2fbef 100644 --- a/tests/integrations/jupyter/test_magics.py +++ b/tests/integrations/jupyter/test_magics.py @@ -291,7 +291,7 @@ def test_plan( # TODO: Should this be going to stdout? This is printing the status updates for when each batch finishes for # the models and how long it took - assert len(output.stdout.strip().split("\n")) == 24 + assert len(output.stdout.strip().split("\n")) == 46 assert not output.stderr assert len(output.outputs) == 4 text_output = convert_all_html_output_to_text(output) @@ -326,7 +326,7 @@ def test_run_dag( assert not output.stderr assert len(output.outputs) == 2 assert convert_all_html_output_to_text(output) == [ - "Model batches executed successfully", + "Model batches evaluated successfully", "Run finished for environment 'prod'", ] assert get_all_html_output(output) == [ @@ -337,7 +337,7 @@ def test_run_dag( h( "span", {"style": SUCCESS_STYLE}, - "Model batches executed successfully", + "Model batches evaluated successfully", autoescape=False, ), autoescape=False, diff --git a/web/server/console.py b/web/server/console.py index 3112de3f14..8ca407631b 100644 --- a/web/server/console.py +++ b/web/server/console.py @@ -12,6 +12,7 @@ from sqlmesh.core.environment import EnvironmentNamingInfo from sqlmesh.core.plan.definition import EvaluatablePlan from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike +from sqlmesh.core.snapshot.definition import Intervals from sqlmesh.core.test import ModelTest from sqlmesh.utils.date import now_timestamp from web.server import models @@ -91,7 +92,7 @@ def stop_restate_progress(self, success: bool) -> None: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batched_intervals: t.Dict[Snapshot, Intervals], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -104,7 +105,7 @@ def start_evaluation_progress( name=snapshot.name, view_name=snapshot.display_name(environment_naming_info, default_catalog), ) - for snapshot, total_tasks in batches.items() + for snapshot, total_tasks in batched_intervals.items() } self.plan_apply_stage_tracker.add_stage( models.PlanStage.backfill, From 35e2fe7483600b88c17135f60cf2c9c1e76780dc Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Fri, 14 Mar 2025 14:35:27 -0500 Subject: [PATCH 02/23] Fix magics test --- tests/integrations/jupyter/test_magics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integrations/jupyter/test_magics.py b/tests/integrations/jupyter/test_magics.py index 4f84c2fbef..88b037c765 100644 --- a/tests/integrations/jupyter/test_magics.py +++ b/tests/integrations/jupyter/test_magics.py @@ -299,11 +299,11 @@ def test_plan( # This has minor differences between CI/CD and local. assert "[2K" in text_output[0] assert text_output[1].startswith( - "Virtually Updating 'prod' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%" + "Virtually updating 'prod' environment views ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%" ) # TODO: Is this what we expect? assert text_output[2] == "" - assert text_output[3] == "Target environment updated successfully" + assert text_output[3] == "Environment views updated successfully" assert convert_all_html_output_to_tags(output) == [ ["pre", "span"], ["pre"] + ["span"] * 4, From 772b73a7a83742295a7082ef27c228ab84fbf58c Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Fri, 14 Mar 2025 16:35:56 -0500 Subject: [PATCH 03/23] Store audit results via callback rather than local console reference --- sqlmesh/core/console.py | 1 + sqlmesh/core/scheduler.py | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 03f9755ace..2a7f99acbf 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -1356,6 +1356,7 @@ def log_warning(self, short_message: str, long_message: t.Optional[str] = None) logger.warning(long_message or short_message) if not self.ignore_warnings: if long_message: + file_path = None for handler in logger.root.handlers: if isinstance(handler, logging.FileHandler): file_path = handler.baseFilename diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index bbd6629d18..f841e3d9d9 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -7,6 +7,7 @@ from sqlmesh.core.console import Console, get_console from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements from sqlmesh.core.macros import RuntimeStage +from sqlmesh.core.model.definition import AuditResult from sqlmesh.core.node import IntervalUnit from sqlmesh.core.notification_target import ( NotificationEvent, @@ -167,6 +168,8 @@ def evaluate( batch_index: int, environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], + on_audits_complete: t.Optional[t.Callable[[Snapshot, t.List[AuditResult]], None]] = None, + on_audit_warning: t.Optional[t.Callable[[str, t.Optional[str]], None]] = None, **kwargs: t.Any, ) -> None: """Evaluate a snapshot and add the processed interval to the state sync. @@ -207,7 +210,8 @@ def evaluate( wap_id=wap_id, **kwargs, ) - get_console().store_evaluation_audit_results(snapshot, audit_results) + if on_audits_complete: + on_audits_complete(snapshot, audit_results) audit_errors_to_raise: t.List[AuditError] = [] for audit_result in (result for result in audit_results if result.count): @@ -232,10 +236,11 @@ def evaluate( default_catalog, self.snapshot_evaluator.adapter.dialect, ) - get_console().log_warning( - f"\n{display_name}: {error}.", - long_message=f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}", - ) + if on_audit_warning: + on_audit_warning( + f"\n{display_name}: {error}.", + f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}", + ) if audit_errors_to_raise: raise NodeAuditsErrors(audit_errors_to_raise) @@ -483,6 +488,8 @@ def evaluate_node(node: SchedulingUnit) -> None: batch_index=batch_idx, environment_naming_info=environment_naming_info, default_catalog=self.default_catalog, + on_audits_complete=self.console.store_evaluation_audit_results, + on_audit_warning=self.console.log_warning, ) evaluation_duration_ms = now_timestamp() - execution_start_ts finally: From bc331685ea72ff41cfb39ade703d0a85e46faa6a Mon Sep 17 00:00:00 2001 From: Trey Spiller <1831878+treysp@users.noreply.github.com> Date: Wed, 19 Mar 2025 12:37:34 -0500 Subject: [PATCH 04/23] Feat: add second level of verbosity (#3982) --- sqlmesh/core/console.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 2a7f99acbf..30b6bfce85 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -824,7 +824,8 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) """Update the snapshot promotion progress.""" if self.promotion_progress is not None and self.promotion_task is not None: if self.verbosity >= Verbosity.VERBOSE: - action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" + check_mark = f"{GREEN_CHECK_MARK} " if promoted else " " + action_str = "[green]updated[/green]" if promoted else "[yellow]demoted[/yellow]" self.promotion_progress.live.console.print( f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} {action_str}" ) From b7b676b6b75bae899404e68825078ec62056dfc7 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Mon, 24 Mar 2025 14:17:03 -0500 Subject: [PATCH 05/23] Call console directly in scheduler --- sqlmesh/core/scheduler.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index f841e3d9d9..4450e33532 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -7,7 +7,6 @@ from sqlmesh.core.console import Console, get_console from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements from sqlmesh.core.macros import RuntimeStage -from sqlmesh.core.model.definition import AuditResult from sqlmesh.core.node import IntervalUnit from sqlmesh.core.notification_target import ( NotificationEvent, @@ -168,8 +167,6 @@ def evaluate( batch_index: int, environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], - on_audits_complete: t.Optional[t.Callable[[Snapshot, t.List[AuditResult]], None]] = None, - on_audit_warning: t.Optional[t.Callable[[str, t.Optional[str]], None]] = None, **kwargs: t.Any, ) -> None: """Evaluate a snapshot and add the processed interval to the state sync. @@ -210,8 +207,7 @@ def evaluate( wap_id=wap_id, **kwargs, ) - if on_audits_complete: - on_audits_complete(snapshot, audit_results) + self.console.store_evaluation_audit_results(snapshot, audit_results) audit_errors_to_raise: t.List[AuditError] = [] for audit_result in (result for result in audit_results if result.count): @@ -236,11 +232,10 @@ def evaluate( default_catalog, self.snapshot_evaluator.adapter.dialect, ) - if on_audit_warning: - on_audit_warning( - f"\n{display_name}: {error}.", - f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}", - ) + self.console.log_warning( + f"\n{display_name}: {error}.", + f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}", + ) if audit_errors_to_raise: raise NodeAuditsErrors(audit_errors_to_raise) @@ -488,8 +483,6 @@ def evaluate_node(node: SchedulingUnit) -> None: batch_index=batch_idx, environment_naming_info=environment_naming_info, default_catalog=self.default_catalog, - on_audits_complete=self.console.store_evaluation_audit_results, - on_audit_warning=self.console.log_warning, ) evaluation_duration_ms = now_timestamp() - execution_start_ts finally: From e18f36832ad4700ed0312f681446b10c0cf29498 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Tue, 25 Mar 2025 09:55:23 -0500 Subject: [PATCH 06/23] Include default catalog in model names if very verbose --- sqlmesh/core/console.py | 66 +++++++++++++++++------------ sqlmesh/core/snapshot/definition.py | 9 ++++ 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 30b6bfce85..aa4f63b864 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -643,7 +643,9 @@ def start_evaluation_progress( def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks: display_name = snapshot.display_name( - self.environment_naming_info, self.default_catalog, dialect=self.dialect + self.environment_naming_info, + self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task( f"Evaluating {display_name}...", @@ -749,7 +751,7 @@ def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: if self.creation_progress is not None and self.creation_task is not None: if self.verbosity >= Verbosity.VERBOSE: self.creation_progress.live.console.print( - f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} [green]created[/green]" + f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} [green]created[/green]" ) self.creation_progress.update(self.creation_task, refresh=True, advance=1) @@ -827,7 +829,7 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) check_mark = f"{GREEN_CHECK_MARK} " if promoted else " " action_str = "[green]updated[/green]" if promoted else "[yellow]demoted[/yellow]" self.promotion_progress.live.console.print( - f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} {action_str}" + f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} {action_str}" ) self.promotion_progress.update(self.promotion_task, refresh=True, advance=1) @@ -1044,7 +1046,7 @@ def _show_summary_tree_for( for s_id in sorted(added_snapshot_ids): snapshot = context_diff.snapshots[s_id] added_tree.add( - f"[added]{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[added]{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) tree.add(self._limit_model_names(added_tree, self.verbosity)) if removed_snapshot_ids: @@ -1052,7 +1054,7 @@ def _show_summary_tree_for( for s_id in sorted(removed_snapshot_ids): snapshot_table_info = context_diff.removed_snapshots[s_id] removed_tree.add( - f"[removed]{snapshot_table_info.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[removed]{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) tree.add(self._limit_model_names(removed_tree, self.verbosity)) if modified_snapshot_ids: @@ -1062,7 +1064,9 @@ def _show_summary_tree_for( for s_id in modified_snapshot_ids: name = s_id.name display_name = context_diff.snapshots[s_id].display_name( - environment_naming_info, default_catalog, dialect=self.dialect + environment_naming_info, + default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) if context_diff.directly_modified(name): direct.add( @@ -1139,7 +1143,7 @@ def _prompt_categorize( if not no_diff: self.show_sql(plan.context_diff.text_diff(snapshot.name)) tree = Tree( - f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) indirect_tree = None @@ -1149,7 +1153,7 @@ def _prompt_categorize( indirect_tree = Tree("[indirect]Indirectly Modified Children:") tree.add(indirect_tree) indirect_tree.add( - f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) if indirect_tree: indirect_tree = self._limit_model_names(indirect_tree, self.verbosity) @@ -1167,7 +1171,7 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st if context_diff.directly_modified(snapshot.name): category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category] tree = Tree( - f"\n[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)} ({category_str})" + f"\n[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} ({category_str})" ) indirect_tree = None for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): @@ -1179,13 +1183,13 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st child_snapshot.change_category ] indirect_tree.add( - f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)} ({child_category_str})" + f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} ({child_category_str})" ) if indirect_tree: indirect_tree = self._limit_model_names(indirect_tree, self.verbosity) elif context_diff.metadata_updated(snapshot.name): tree = Tree( - f"\n[bold][metadata]Metadata Updated: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)}" + f"\n[bold][metadata]Metadata Updated: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) else: continue @@ -1212,7 +1216,9 @@ def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> N preview_modifier = " ([orange1]preview[/orange1])" display_name = snapshot.display_name( - plan.environment_naming_info, default_catalog, dialect=self.dialect + plan.environment_naming_info, + default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) backfill.add( f"{display_name}: \\[{_format_missing_intervals(snapshot, missing)}]{preview_modifier}" @@ -1610,7 +1616,9 @@ def _snapshot_change_choices( use_rich_formatting: bool = True, ) -> t.Dict[SnapshotChangeCategory, str]: direct = snapshot.display_name( - environment_naming_info, default_catalog, dialect=self.dialect + environment_naming_info, + default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) if use_rich_formatting: direct = f"[direct]{direct}[/direct]" @@ -2079,7 +2087,7 @@ def show_model_difference_summary( else: for snapshot in added_models: self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) added_snapshot_audits = {s for s in added_snapshots if s.is_audit} @@ -2087,7 +2095,7 @@ def show_model_difference_summary( self._print("\n**Added Standalone Audits:**") for snapshot in sorted(added_snapshot_audits): self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) removed_snapshot_table_infos = set(context_diff.removed_snapshots.values()) @@ -2106,7 +2114,7 @@ def show_model_difference_summary( else: for snapshot_table_info in removed_models: self._print( - f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit} @@ -2114,7 +2122,7 @@ def show_model_difference_summary( self._print("\n**Removed Standalone Audits:**") for snapshot_table_info in sorted(removed_audit_snapshot_table_infos): self._print( - f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) modified_snapshots = { @@ -2135,7 +2143,7 @@ def show_model_difference_summary( self._print("\n**Directly Modified:**") for snapshot in sorted(directly_modified): self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) if not no_diff: self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```") @@ -2148,20 +2156,20 @@ def show_model_difference_summary( and modified_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD ): self._print( - f"- `{indirectly_modified[0].display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`\n" + f"- `{indirectly_modified[0].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`\n" f"- `.... {modified_length-2} more ....`\n" - f"- `{indirectly_modified[-1].display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{indirectly_modified[-1].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) else: for snapshot in indirectly_modified: self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) if metadata_modified: self._print("\n**Metadata Updated:**") for snapshot in sorted(metadata_modified): self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: @@ -2181,7 +2189,9 @@ def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> N preview_modifier = " (**preview**)" display_name = snapshot.display_name( - plan.environment_naming_info, default_catalog, dialect=self.dialect + plan.environment_naming_info, + default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) snapshots.append( f"* `{display_name}`: [{_format_missing_intervals(snapshot, missing)}]{preview_modifier}" @@ -2205,7 +2215,7 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st if context_diff.directly_modified(snapshot.name): category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category] tree = Tree( - f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)} ({category_str})" + f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} ({category_str})" ) indirect_tree = None for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): @@ -2217,13 +2227,13 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st child_snapshot.change_category ] indirect_tree.add( - f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)} ({child_category_str})" + f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} ({child_category_str})" ) if indirect_tree: indirect_tree = self._limit_model_names(indirect_tree, self.verbosity) elif context_diff.metadata_updated(snapshot.name): tree = Tree( - f"[bold][metadata]Metadata Updated: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[bold][metadata]Metadata Updated: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) else: continue @@ -2328,7 +2338,9 @@ def start_evaluation_progress( def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: if not self.evaluation_batch_progress.get(snapshot.snapshot_id): display_name = snapshot.display_name( - self.evaluation_environment_naming_info, self.default_catalog, dialect=self.dialect + self.evaluation_environment_naming_info, + self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0) print( diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 920e165ba5..f208001904 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1559,6 +1559,15 @@ def display_name( Returns the model name as a qualified view name. This is just used for presenting information back to the user and `qualified_view_name` should be used when wanting a view name in all other cases. + + Args: + snapshot_info_like: The snapshot info object to get the display name for + environment_naming_info: Environment naming info to use for display name formatting + default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name. + dialect: Optional dialect type to use for name formatting + + Returns: + The formatted display name as a string """ if snapshot_info_like.is_audit: return snapshot_info_like.name From e19d01cd560c2c659434080c487460213c5ec27c Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Tue, 25 Mar 2025 11:31:58 -0500 Subject: [PATCH 07/23] Pass audit results directly to evaluation prog bar output --- sqlmesh/core/console.py | 66 +++++++++++++++++++-------------------- sqlmesh/core/scheduler.py | 19 ++++++++--- web/server/console.py | 7 ++++- 3 files changed, 54 insertions(+), 38 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index aa4f63b864..252c7f0e15 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -29,7 +29,6 @@ from sqlmesh.core.environment import EnvironmentNamingInfo from sqlmesh.core.linter.rule import RuleViolation from sqlmesh.core.model import Model -from sqlmesh.core.model.definition import AuditResult from sqlmesh.core.snapshot import ( Snapshot, SnapshotChangeCategory, @@ -93,12 +92,6 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None: def stop_plan_evaluation(self) -> None: """Indicates that the evaluation has ended.""" - @abc.abstractmethod - def store_evaluation_audit_results( - self, snapshot: Snapshot, audit_results: t.List[AuditResult] - ) -> None: - """Stores the audit results for the snapshot evaluation.""" - @abc.abstractmethod def start_evaluation_progress( self, @@ -114,7 +107,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: @abc.abstractmethod def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: """Updates the snapshot evaluation progress.""" @@ -350,11 +348,6 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None: def stop_plan_evaluation(self) -> None: pass - def store_evaluation_audit_results( - self, snapshot: Snapshot, audit_results: t.List[AuditResult] - ) -> None: - pass - def start_evaluation_progress( self, batches: t.Dict[Snapshot, Intervals], @@ -367,7 +360,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: pass def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: pass @@ -551,7 +549,6 @@ def __init__( self.evaluation_model_batch_sizes: t.Dict[Snapshot, int] = {} self.evaluation_model_info: t.Dict[Snapshot, t.Dict[str, t.Any]] = {} self.evaluation_model_column_widths: t.Dict[str, int] = {} - self.evaluation_audit_results: t.Dict[Snapshot, t.List[AuditResult]] = {} # Put in temporary values that are replaced when evaluating self.environment_naming_info = EnvironmentNamingInfo() @@ -590,11 +587,6 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None: def stop_plan_evaluation(self) -> None: pass - def store_evaluation_audit_results( - self, snapshot: Snapshot, audit_results: t.List[AuditResult] - ) -> None: - self.evaluation_audit_results[snapshot] = audit_results - def start_evaluation_progress( self, batched_intervals: t.Dict[Snapshot, Intervals], @@ -654,7 +646,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: ) def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: """Update the snapshot evaluation progress.""" if ( @@ -672,14 +669,6 @@ def update_snapshot_evaluation_progress( ) annotation = self.evaluation_model_info[snapshot]["annotation"][batch_idx] - num_audits = 0 - num_audits_passed = 0 - if snapshot in self.evaluation_audit_results: - num_audits = len(self.evaluation_audit_results[snapshot]) - num_audits_passed = sum( - result.count == 0 for result in self.evaluation_audit_results[snapshot] - ) - num_audits_failed = num_audits - num_audits_passed if num_audits_passed: annotation += f", {num_audits_passed} audits pass" if num_audits_failed: @@ -721,7 +710,6 @@ def stop_evaluation_progress(self, success: bool = True) -> None: self.evaluation_model_batch_sizes = {} self.evaluation_model_info = {} self.evaluation_model_column_widths = {} - self.evaluation_audit_results = {} self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -2348,7 +2336,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: ) def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id] total_batches = self.evaluation_model_batch_sizes[snapshot] @@ -2499,9 +2492,16 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: self._write(f"Evaluating {snapshot.name}") def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: - self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms") + self._write( + f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}" + ) def stop_evaluation_progress(self, success: bool = True) -> None: self._write(f"Stopping evaluation with success={success}") diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 4450e33532..b76f6ee804 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -7,6 +7,7 @@ from sqlmesh.core.console import Console, get_console from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements from sqlmesh.core.macros import RuntimeStage +from sqlmesh.core.model.definition import AuditResult from sqlmesh.core.node import IntervalUnit from sqlmesh.core.notification_target import ( NotificationEvent, @@ -168,7 +169,7 @@ def evaluate( environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], **kwargs: t.Any, - ) -> None: + ) -> t.List[AuditResult]: """Evaluate a snapshot and add the processed interval to the state sync. Args: @@ -180,6 +181,9 @@ def evaluate( batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it auto_restatement_enabled: Whether to enable auto restatements. kwargs: Additional kwargs to pass to the renderer. + + Returns: + List of audit results from the evaluation. """ validate_date_range(start, end) @@ -207,7 +211,6 @@ def evaluate( wap_id=wap_id, **kwargs, ) - self.console.store_evaluation_audit_results(snapshot, audit_results) audit_errors_to_raise: t.List[AuditError] = [] for audit_result in (result for result in audit_results if result.count): @@ -241,6 +244,7 @@ def evaluate( raise NodeAuditsErrors(audit_errors_to_raise) self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable) + return audit_results def run( self, @@ -474,7 +478,8 @@ def evaluate_node(node: SchedulingUnit) -> None: try: assert execution_time # mypy assert deployability_index # mypy - self.evaluate( + audit_results = [] + audit_results = self.evaluate( snapshot=snapshot, start=start, end=end, @@ -486,8 +491,14 @@ def evaluate_node(node: SchedulingUnit) -> None: ) evaluation_duration_ms = now_timestamp() - execution_start_ts finally: + num_audits = len(audit_results) + num_audits_failed = sum(1 for result in audit_results if result.count) self.console.update_snapshot_evaluation_progress( - snapshot, batch_idx, evaluation_duration_ms + snapshot, + batch_idx, + evaluation_duration_ms, + num_audits - num_audits_failed, + num_audits_failed, ) try: diff --git a/web/server/console.py b/web/server/console.py index 8ca407631b..ae77b52335 100644 --- a/web/server/console.py +++ b/web/server/console.py @@ -124,7 +124,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: self.log_event_plan_apply() def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + batch_idx: int, + duration_ms: t.Optional[int], + audits_passed: int, + audits_failed: int, ) -> None: if self.plan_apply_stage_tracker and self.plan_apply_stage_tracker.backfill: task = self.plan_apply_stage_tracker.backfill.tasks[snapshot.name] From ba7757010185ec687cabf90a5fef756043aba39d Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Tue, 25 Mar 2025 19:03:33 -0500 Subject: [PATCH 08/23] Do not precompile model evaluation info --- sqlmesh/core/console.py | 79 +++++++++++++++++++-------------------- sqlmesh/core/scheduler.py | 3 +- tests/cli/test_cli.py | 16 ++++---- web/server/console.py | 6 +-- 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 252c7f0e15..b3bc747b06 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -95,7 +95,7 @@ def stop_plan_evaluation(self) -> None: @abc.abstractmethod def start_evaluation_progress( self, - batches: t.Dict[Snapshot, Intervals], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -109,6 +109,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: def update_snapshot_evaluation_progress( self, snapshot: Snapshot, + interval: Interval, batch_idx: int, duration_ms: t.Optional[int], num_audits_passed: int, @@ -350,7 +351,7 @@ def stop_plan_evaluation(self) -> None: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, Intervals], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -362,6 +363,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: def update_snapshot_evaluation_progress( self, snapshot: Snapshot, + interval: Interval, batch_idx: int, duration_ms: t.Optional[int], num_audits_passed: int, @@ -531,6 +533,13 @@ class TerminalConsole(Console): TABLE_DIFF_SOURCE_BLUE = "#0248ff" + EVAL_PROGRESS_BAR_COLUMN_WIDTHS: t.Dict[str, int] = { + "batch": 9, + "name": 50, + "annotation": 50, + "duration": 8, + } + def __init__( self, console: t.Optional[RichConsole] = None, @@ -546,9 +555,6 @@ def __init__( self.evaluation_total_task: t.Optional[TaskID] = None self.evaluation_model_progress: t.Optional[Progress] = None self.evaluation_model_tasks: t.Dict[str, TaskID] = {} - self.evaluation_model_batch_sizes: t.Dict[Snapshot, int] = {} - self.evaluation_model_info: t.Dict[Snapshot, t.Dict[str, t.Any]] = {} - self.evaluation_model_column_widths: t.Dict[str, int] = {} # Put in temporary values that are replaced when evaluating self.environment_naming_info = EnvironmentNamingInfo() @@ -589,28 +595,12 @@ def stop_plan_evaluation(self) -> None: def start_evaluation_progress( self, - batched_intervals: t.Dict[Snapshot, Intervals], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: """Indicates that a new snapshot evaluation progress has begun.""" if not self.evaluation_progress_live: - self.evaluation_model_batch_sizes = { - snapshot: len(intervals) for snapshot, intervals in batched_intervals.items() - } - self.environment_naming_info = environment_naming_info - self.default_catalog = default_catalog - - self.evaluation_model_info, self.evaluation_model_column_widths = ( - _create_evaluation_model_info( - batched_intervals, - self.evaluation_model_batch_sizes, - environment_naming_info, - default_catalog, - self.dialect, - ) - ) - self.evaluation_total_progress = make_progress_bar( "Evaluating model batches", self.console ) @@ -629,9 +619,13 @@ def start_evaluation_progress( self.evaluation_progress_live.start() self.evaluation_total_task = self.evaluation_total_progress.add_task( - "Evaluating models...", total=sum(self.evaluation_model_batch_sizes.values()) + "Evaluating models...", total=sum(batch_sizes.values()) ) + self.evaluation_model_batch_sizes = batch_sizes + self.environment_naming_info = environment_naming_info + self.default_catalog = default_catalog + def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks: display_name = snapshot.display_name( @@ -648,6 +642,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: def update_snapshot_evaluation_progress( self, snapshot: Snapshot, + interval: Interval, batch_idx: int, duration_ms: t.Optional[int], num_audits_passed: int, @@ -661,26 +656,32 @@ def update_snapshot_evaluation_progress( ): total_batches = self.evaluation_model_batch_sizes[snapshot] batch_num = str(batch_idx + 1).rjust(len(str(total_batches))) - batch = f"[{batch_num}/{total_batches}] " + batch = f"[{batch_num}/{total_batches}]".ljust( + self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["batch"] + ) if duration_ms: - display_name = self.evaluation_model_info[snapshot]["display_name"].ljust( - self.evaluation_model_column_widths["display_name"] + display_name = snapshot.display_name( + self.environment_naming_info, + self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, + ).ljust(self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["name"]) + + annotation = _create_evaluation_model_annotation( + snapshot, _format_evaluation_model_interval(snapshot, interval) ) - annotation = self.evaluation_model_info[snapshot]["annotation"][batch_idx] if num_audits_passed: annotation += f", {num_audits_passed} audits pass" if num_audits_failed: annotation += f", {num_audits_failed} audits fail {RED_X_MARK}" annotation = (annotation + "]").ljust( - self.evaluation_model_column_widths["annotation"] + self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["annotation"] ) - # 8 characters for duration - # if the failed audit red X is present, the console adds an extra space - duration_width = 7 if num_audits_failed else 8 - duration = f"{(duration_ms / 1000.0):.2f}s".rjust(duration_width) + duration = f"{(duration_ms / 1000.0):.2f}s".rjust( + self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["duration"] + ) self.evaluation_progress_live.console.print( f"{GREEN_CHECK_MARK} {batch}{display_name}{annotation} {duration}" @@ -708,8 +709,6 @@ def stop_evaluation_progress(self, success: bool = True) -> None: self.evaluation_model_progress = None self.evaluation_model_tasks = {} self.evaluation_model_batch_sizes = {} - self.evaluation_model_info = {} - self.evaluation_model_column_widths = {} self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -2313,13 +2312,11 @@ def _confirm(self, message: str, **kwargs: t.Any) -> bool: def start_evaluation_progress( self, - batched_intervals: t.Dict[Snapshot, Intervals], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: - self.evaluation_model_batch_sizes = { - snapshot: len(intervals) for snapshot, intervals in batched_intervals.items() - } + self.evaluation_model_batch_sizes = batch_sizes self.evaluation_environment_naming_info = environment_naming_info self.default_catalog = default_catalog @@ -2338,6 +2335,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: def update_snapshot_evaluation_progress( self, snapshot: Snapshot, + interval: Interval, batch_idx: int, duration_ms: t.Optional[int], num_audits_passed: int, @@ -2482,11 +2480,11 @@ def stop_plan_evaluation(self) -> None: def start_evaluation_progress( self, - batched_intervals: t.Dict[Snapshot, Intervals], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: - self._write(f"Starting evaluation for {len(batched_intervals)} snapshots") + self._write(f"Starting evaluation for {sum(batch_sizes.values())} snapshots") def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: self._write(f"Evaluating {snapshot.name}") @@ -2494,6 +2492,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: def update_snapshot_evaluation_progress( self, snapshot: Snapshot, + interval: Interval, batch_idx: int, duration_ms: t.Optional[int], num_audits_passed: int, diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index b76f6ee804..56339a26dc 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -436,7 +436,7 @@ def run_merged_intervals( batched_intervals = self.batch_intervals(merged_intervals) self.console.start_evaluation_progress( - batched_intervals, + {snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()}, environment_naming_info, self.default_catalog, ) @@ -495,6 +495,7 @@ def evaluate_node(node: SchedulingUnit) -> None: num_audits_failed = sum(1 for result in audit_results if result.count) self.console.update_snapshot_evaluation_progress( snapshot, + batched_intervals[snapshot][batch_idx], batch_idx, evaluation_duration_ms, num_audits - num_audits_failed, diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 15d342b546..df25d21ca4 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -242,7 +242,7 @@ def test_plan_restate_model(runner, tmp_path): assert result.exit_code == 0 assert_duckdb_test(result) assert "No changes to plan: project files match the `prod` environment" in result.output - assert "sqlmesh_example.full_model [full refresh" in result.output + assert "sqlmesh_example.full_model [full refresh" in result.output assert_model_batches_evaluated(result) assert_env_views_updated(result) @@ -552,7 +552,7 @@ def test_plan_nonbreaking(runner, tmp_path): assert "+ 'a' AS new_col" in result.output assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output - assert "sqlmesh_example.incremental_model [insert" in result.output + assert "sqlmesh_example.incremental_model [insert" in result.output assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output assert_backfill_success(result) @@ -610,8 +610,8 @@ def test_plan_breaking(runner, tmp_path): assert result.exit_code == 0 assert "+ item_id + 1 AS item_id," in result.output assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output - assert "sqlmesh_example.full_model [full refresh" in result.output - assert "sqlmesh_example.incremental_model [insert" not in result.output + assert "sqlmesh_example.full_model [full refresh" in result.output + assert "sqlmesh_example.incremental_model [insert" not in result.output assert_backfill_success(result) @@ -649,8 +649,8 @@ def test_plan_dev_select(runner, tmp_path): assert "+ item_id + 1 AS item_id," not in result.output assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output # only incremental_model backfilled - assert "sqlmesh_example__dev.incremental_model [insert" in result.output - assert "sqlmesh_example__dev.full_model [full refresh" not in result.output + assert "sqlmesh_example__dev.incremental_model [insert" in result.output + assert "sqlmesh_example__dev.full_model [full refresh" not in result.output assert_backfill_success(result) @@ -688,8 +688,8 @@ def test_plan_dev_backfill(runner, tmp_path): "Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output ) # only incremental_model backfilled - assert "sqlmesh_example__dev.incremental_model [insert" in result.output - assert "sqlmesh_example__dev.full_model [full refresh" not in result.output + assert "sqlmesh_example__dev.incremental_model [insert" in result.output + assert "sqlmesh_example__dev.full_model [full refresh" not in result.output assert_backfill_success(result) diff --git a/web/server/console.py b/web/server/console.py index ae77b52335..df35e514ff 100644 --- a/web/server/console.py +++ b/web/server/console.py @@ -7,12 +7,11 @@ from fastapi.encoders import jsonable_encoder from sse_starlette.sse import ServerSentEvent - +from sqlmesh.core.snapshot.definition import Interval from sqlmesh.core.console import TerminalConsole from sqlmesh.core.environment import EnvironmentNamingInfo from sqlmesh.core.plan.definition import EvaluatablePlan from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike -from sqlmesh.core.snapshot.definition import Intervals from sqlmesh.core.test import ModelTest from sqlmesh.utils.date import now_timestamp from web.server import models @@ -92,7 +91,7 @@ def stop_restate_progress(self, success: bool) -> None: def start_evaluation_progress( self, - batched_intervals: t.Dict[Snapshot, Intervals], + batched_intervals: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -126,6 +125,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: def update_snapshot_evaluation_progress( self, snapshot: Snapshot, + interval: Interval, batch_idx: int, duration_ms: t.Optional[int], audits_passed: int, From 58a36d17be5a826e3db6f215bd1433e62a0e8334 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Tue, 25 Mar 2025 19:22:10 -0500 Subject: [PATCH 09/23] Add plan summary of executed model kinds --- sqlmesh/core/console.py | 213 ++++++++++------------ sqlmesh/core/plan/evaluator.py | 35 ++++ tests/cli/test_cli.py | 12 +- tests/core/test_context.py | 2 +- tests/integrations/jupyter/test_magics.py | 6 +- 5 files changed, 137 insertions(+), 131 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index b3bc747b06..4c8c4f6400 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -35,12 +35,12 @@ SnapshotId, SnapshotInfoLike, ) -from sqlmesh.core.snapshot.definition import Interval, Intervals +from sqlmesh.core.snapshot.definition import Interval from sqlmesh.core.test import ModelTest from sqlmesh.utils import rich as srich from sqlmesh.utils import Verbosity from sqlmesh.utils.concurrency import NodeExecutionFailedError -from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds, to_ds, to_tstz +from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds, to_ds, to_datetime from sqlmesh.utils.errors import ( PythonModelEvalError, NodeAuditsErrors, @@ -74,7 +74,8 @@ PROGRESS_BAR_WIDTH = 40 LINE_WRAP_WIDTH = 100 -GREEN_CHECK_MARK = "[green]\u2714[/green]" +CHECK_MARK = "\u2714" +GREEN_CHECK_MARK = f"[green]{CHECK_MARK}[/green]" RED_X_MARK = "\u274c" @@ -533,8 +534,8 @@ class TerminalConsole(Console): TABLE_DIFF_SOURCE_BLUE = "#0248ff" - EVAL_PROGRESS_BAR_COLUMN_WIDTHS: t.Dict[str, int] = { - "batch": 9, + PROGRESS_BAR_COLUMN_WIDTHS: t.Dict[str, int] = { + "batch": 7, "name": 50, "annotation": 50, "duration": 8, @@ -602,7 +603,7 @@ def start_evaluation_progress( """Indicates that a new snapshot evaluation progress has begun.""" if not self.evaluation_progress_live: self.evaluation_total_progress = make_progress_bar( - "Evaluating model batches", self.console + "Executing model batches", self.console ) self.evaluation_model_progress = Progress( @@ -656,31 +657,42 @@ def update_snapshot_evaluation_progress( ): total_batches = self.evaluation_model_batch_sizes[snapshot] batch_num = str(batch_idx + 1).rjust(len(str(total_batches))) - batch = f"[{batch_num}/{total_batches}]".ljust( - self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["batch"] - ) + batch = f"[{batch_num}/{total_batches}]".ljust(self.PROGRESS_BAR_COLUMN_WIDTHS["batch"]) if duration_ms: - display_name = snapshot.display_name( - self.environment_naming_info, - self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, - dialect=self.dialect, - ).ljust(self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["name"]) + display_name = _justify_evaluation_model_info( + snapshot.display_name( + self.environment_naming_info, + self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, + ), + self.PROGRESS_BAR_COLUMN_WIDTHS["name"], + ) annotation = _create_evaluation_model_annotation( snapshot, _format_evaluation_model_interval(snapshot, interval) ) - + audits_str = "" if num_audits_passed: - annotation += f", {num_audits_passed} audits pass" + audits_str += f" {CHECK_MARK}{num_audits_passed}" if num_audits_failed: - annotation += f", {num_audits_failed} audits fail {RED_X_MARK}" - annotation = (annotation + "]").ljust( - self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["annotation"] + audits_str += f" {RED_X_MARK}{num_audits_failed}" + audits_str = f", audits{audits_str}" if audits_str else "" + + annotation_width = self.PROGRESS_BAR_COLUMN_WIDTHS["annotation"] + annotation = _justify_evaluation_model_info( + annotation + audits_str, + annotation_width + if not num_audits_failed + else annotation_width - 1, # -1 for RED_X_MARK's extra space + dots_side="right", + prefix=" \[", + suffix="]", ) + annotation = annotation.replace(CHECK_MARK, GREEN_CHECK_MARK) - duration = f"{(duration_ms / 1000.0):.2f}s".rjust( - self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["duration"] + duration = f"{(duration_ms / 1000.0):.2f}s".ljust( + self.PROGRESS_BAR_COLUMN_WIDTHS["duration"] ) self.evaluation_progress_live.console.print( @@ -701,7 +713,7 @@ def stop_evaluation_progress(self, success: bool = True) -> None: if self.evaluation_progress_live: self.evaluation_progress_live.stop() if success: - self.log_success("Model batches evaluated successfully") + self.log_success(f"{GREEN_CHECK_MARK} Model batches executed successfully") self.evaluation_progress_live = None self.evaluation_total_progress = None @@ -720,13 +732,12 @@ def start_creation_progress( ) -> None: """Indicates that a new creation progress has begun.""" if self.creation_progress is None: - message = "Creating physical table" if total_tasks == 1 else "Creating physical tables" - self.creation_progress = make_progress_bar(message, self.console) + self.creation_progress = make_progress_bar("Updating physical layer", self.console) self._print("") self.creation_progress.start() self.creation_task = self.creation_progress.add_task( - "Creating physical tables...", + "Updating physical layer...", total=total_tasks, ) @@ -738,7 +749,7 @@ def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: if self.creation_progress is not None and self.creation_task is not None: if self.verbosity >= Verbosity.VERBOSE: self.creation_progress.live.console.print( - f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} [green]created[/green]" + f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} [green]created[/green]" ) self.creation_progress.update(self.creation_task, refresh=True, advance=1) @@ -749,7 +760,7 @@ def stop_creation_progress(self, success: bool = True) -> None: self.creation_progress.stop() self.creation_progress = None if success: - self.log_success("\nModel versions created successfully") + self.log_success(f"\n{GREEN_CHECK_MARK} Physical layer updated successfully") self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -790,7 +801,7 @@ def start_promotion_progress( if self.promotion_progress is None: self.promotion_progress = Progress( TextColumn( - f"[bold blue]Virtually updating '{environment_naming_info.name}' environment views", + "[bold blue]Updating virtual layer", justify="right", ), BarColumn(bar_width=PROGRESS_BAR_WIDTH), @@ -816,7 +827,7 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) check_mark = f"{GREEN_CHECK_MARK} " if promoted else " " action_str = "[green]updated[/green]" if promoted else "[yellow]demoted[/yellow]" self.promotion_progress.live.console.print( - f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} {action_str}" + f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}" ) self.promotion_progress.update(self.promotion_task, refresh=True, advance=1) @@ -827,7 +838,7 @@ def stop_promotion_progress(self, success: bool = True) -> None: self.promotion_progress.stop() self.promotion_progress = None if success: - self.log_success("\nEnvironment views updated successfully") + self.log_success(f"\n{GREEN_CHECK_MARK} Virtual layer updated successfully") self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -2617,7 +2628,8 @@ def show_row_diff( self._write(row_diff) -_CONSOLE: Console = NoopConsole() +# _CONSOLE: Console = NoopConsole() +_CONSOLE: Console = TerminalConsole() def set_console(console: Console) -> None: @@ -2733,111 +2745,70 @@ def _format_audits_errors(error: NodeAuditsErrors) -> str: def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) -> str: - if snapshot.is_model: - # only include time if interval < 1 day - fmt_func = ( - to_ds - if (interval[1] - interval[0]) >= datetime.timedelta(days=1).total_seconds() - else to_tstz - ) - return ( - f"insert {fmt_func(interval[0])} - {fmt_func(interval[1])}" - if snapshot.model.kind.is_incremental - or snapshot.model.kind.is_managed - or snapshot.model.kind.is_custom - else "" - ) + if snapshot.is_model and ( + snapshot.model.kind.is_incremental + or snapshot.model.kind.is_managed + or snapshot.model.kind.is_custom + ): + # include time if interval < 1 day + if (interval[1] - interval[0]) < datetime.timedelta(days=1).total_seconds() * 1000: + return f"insert {to_ds(interval[0])} {to_datetime(interval[0]).strftime('%H:%M:%S')}-{to_datetime(interval[1]).strftime('%H:%M:%S')}" + return f"insert {to_ds(interval[0])} - {to_ds(interval[1])}" return "" -def _create_evaluation_model_info( - batched_intervals: t.Dict[Snapshot, Intervals], - model_batch_sizes: t.Dict[Snapshot, int], - environment_naming_info: EnvironmentNamingInfo, - default_catalog: t.Optional[str], - dialect: t.Optional[DialectType], -) -> t.Tuple[t.Dict[Snapshot, t.Dict[str, t.Any]], t.Dict[str, int]]: - """Creates model information dictionaries for model evaluation progress bar. +def _justify_evaluation_model_info( + text: str, + length: int, + justify_direction: str = "left", + dots_side: str = "left", + prefix: str = "", + suffix: str = "", +) -> str: + """Format a model evaluation info string by justifying and truncating if needed. - Parameters: - batched_intervals: Dictionary mapping snapshot batches to their evaluation intervals - model_batch_sizes: Dictionary mapping snapshots to their batch sizes - environment_naming_info: Information about environment naming, needed to render model name - default_catalog: Optional default catalog name for rendering model name - dialect: Optional SQL dialect for rendering model name + Args: + text: The string to format + length: The desired number of characters in the returned string + justify_direction: The justification direction ("left" or "l" or "right" or "r") + dots_side: The side of the dots if truncation is needed ("left" or "l" or "right" or "r") + prefix: A prefix to add to the returned string + suffix: A suffix to add to the returned string Returns: - A tuple containing: - - Dictionary mapping snapshots to their model's display information - - Dictionary of output field names to column widths + The justified string, truncated with "..." if needed """ - model_info: t.Dict[Snapshot, t.Dict[str, t.Any]] = {} - model_column_widths = {} - model_column_widths["display_name"] = 0 - model_column_widths["annotation"] = 0 - - for snapshot in batched_intervals: - model_info[snapshot] = {} - model_info[snapshot]["display_name"] = snapshot.display_name( - environment_naming_info, default_catalog, dialect=dialect - ) - model_column_widths["display_name"] = max( - model_column_widths["display_name"], len(model_info[snapshot]["display_name"]) - ) - - # The annotation includes audit results. We cannot build the audits result string - # until after evaluation occurs, but we must determine the annotation column width here. - # Therefore, we add enough padding for the longest possible audits result string. - audit_pad = 0 - if snapshot.is_model and snapshot.model.audits: - num_audits = len(snapshot.model.audits_with_args) - num_nonblocking_audits = sum( - 1 - for audit in snapshot.model.audits_with_args - if not audit[0].blocking - or ("blocking" in audit[1] and audit[1]["blocking"] == exp.false()) - ) - # make enough room for all audits to pass - audit_pad = len(f", {str(num_audits)} audits passed") - if num_nonblocking_audits: - # and add enough room for all nonblocking audits to fail - audit_pad += len(f", {str(num_nonblocking_audits)} audits failed X") # red X - audit_pad += 1 # closing bracket - - model_info[snapshot]["annotation"] = [ - _create_evaluation_model_annotation( - snapshot, - _format_evaluation_model_interval(snapshot, interval), - ) - for interval in batched_intervals[snapshot] - ] - model_column_widths["annotation"] = max( - model_column_widths["annotation"], - max(len(annotation) for annotation in model_info[snapshot]["annotation"]) + audit_pad, + full_text = f"{prefix}{text}{suffix}" + if len(full_text) <= length: + return ( + full_text.ljust(length) + if justify_direction.startswith("l") + else full_text.rjust(length) ) - model_column_widths["batch"] = 5 # number characters in default "[1/1]" - # do we need space for more than one digit? - if any(size > 9 for size in model_batch_sizes.values()): - model_column_widths["batch"] = ( - max(len(str(size)) for size in model_batch_sizes.values()) * 2 - ) + 3 # brackets and slash - - return model_info, model_column_widths + trunc_length = length - len(prefix) - len(suffix) + truncated_text = ( + "..." + text[-(trunc_length - 3) :] + if dots_side.startswith("l") + else text[: (trunc_length - 3)] + "..." + ) + return f"{prefix}{truncated_text}{suffix}" def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str: - if snapshot.is_audit or (snapshot.is_model and snapshot.model.kind.is_external): - return " \[run audits" + if snapshot.is_audit: + return "run standalone audit" + if snapshot.is_model and snapshot.model.kind.is_external: + return "run external model audits" if snapshot.model.kind.is_seed: - return " \[insert from seed file" + return "insert from seed file" if snapshot.model.kind.is_full: - return " \[full refresh" + return "full refresh" if snapshot.model.kind.is_view: - return " \[recreate view" + return "recreate view" if snapshot.model.kind.is_incremental_by_unique_key: - return " \[insert or update rows" + return "insert or update rows" if snapshot.model.kind.is_incremental_by_partition: - return " \[insert partition" + return "insert partition" - return f" \[{interval_info}" if interval_info else "" + return interval_info if interval_info else "" diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 5b01db5ff7..e34b4c4398 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -160,6 +160,41 @@ def evaluate( if not plan.requires_backfill: self.console.log_success("Virtual Update executed successfully") + model_kind_counts: t.Dict[str, int] = {} + audit_counts: t.Dict[str, int] = {} + for snapshot in snapshots.values(): + if snapshot.name in all_names: + if snapshot.is_audit: + audit_counts["standalone"] = audit_counts.get("standalone", 0) + 1 + if ( + snapshot.is_model + and snapshot.model_kind_name + and snapshot.model.kind + and not snapshot.model.kind.is_external + and not snapshot.model.kind.is_embedded + ): + kind_name = snapshot.model_kind_name + model_kind_counts[kind_name] = model_kind_counts.get(kind_name, 0) + 1 + if snapshot.is_model and snapshot.model.audits: + if snapshot.model.kind.is_external: + audit_counts["EXTERNAL model"] = audit_counts.get( + "EXTERNAL model", 0 + ) + len(snapshot.model.audits) + else: + audit_counts["model"] = audit_counts.get("model", 0) + len( + snapshot.model.audits + ) + + summary_str = ", ".join( + [f"{v} {k} model{'s' if v > 1 else ''}" for k, v in model_kind_counts.items()] + ) + for audit_type in ["EXTERNAL model", "model", "standalone"]: + if audit_type in audit_counts: + count = audit_counts[audit_type] + summary_str += f", {count} {audit_type} audit{'s' if count > 1 else ''}" + if summary_str: + self.console.log_status_update(f"Plan applied for {summary_str}") + execute_environment_statements( adapter=self.snapshot_evaluator.adapter, environment_statements=plan.environment_statements or [], diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index df25d21ca4..82fedb201f 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -129,15 +129,15 @@ def assert_new_env(result, new_env="prod", from_env="prod", initialize=True) -> def assert_model_versions_created(result) -> None: - assert "Model versions created successfully" in result.output + assert "Physical layer updated successfully" in result.output def assert_model_batches_evaluated(result) -> None: - assert "Model batches evaluated successfully" in result.output + assert "Model batches executed successfully" in result.output def assert_env_views_updated(result) -> None: - assert "Environment views updated successfully" in result.output + assert "Virtual layer updated successfully" in result.output def assert_backfill_success(result) -> None: @@ -293,8 +293,8 @@ def test_plan_verbose(runner, tmp_path): cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "--verbose"], input="y\n" ) assert_plan_success(result) - assert "sqlmesh_example.seed_model created" in result.output - assert "sqlmesh_example.seed_model updated" in result.output + assert "sqlmesh_example.seed_model created" in result.output + assert "sqlmesh_example.seed_model updated" in result.output def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path): @@ -495,7 +495,7 @@ def test_plan_dev_no_prompts(runner, tmp_path): cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "dev", "--no-prompts"] ) assert "Apply - Backfill Tables [y/n]: " in result.output - assert "Model versions created successfully" not in result.output + assert "Physical layer updated successfully" not in result.output assert "Model batches executed successfully" not in result.output assert "The target environment has been updated successfully" not in result.output diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 860421fb46..5f80c88082 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -1403,7 +1403,7 @@ def test_plan_runs_audits_on_dev_previews(sushi_context: Context, capsys, caplog log = caplog.text assert "'not_null' audit error:" in log assert "'at_least_one_non_blocking' audit error:" in log - assert "Environment views updated successfully" in stdout + assert "Virtual layer updated successfully" in stdout def test_environment_statements(tmp_path: pathlib.Path): diff --git a/tests/integrations/jupyter/test_magics.py b/tests/integrations/jupyter/test_magics.py index 88b037c765..e9822a42b3 100644 --- a/tests/integrations/jupyter/test_magics.py +++ b/tests/integrations/jupyter/test_magics.py @@ -303,7 +303,7 @@ def test_plan( ) # TODO: Is this what we expect? assert text_output[2] == "" - assert text_output[3] == "Environment views updated successfully" + assert text_output[3] == "Virtual layer updated successfully" assert convert_all_html_output_to_tags(output) == [ ["pre", "span"], ["pre"] + ["span"] * 4, @@ -326,7 +326,7 @@ def test_run_dag( assert not output.stderr assert len(output.outputs) == 2 assert convert_all_html_output_to_text(output) == [ - "Model batches evaluated successfully", + "✔ Model batches executed successfully", "Run finished for environment 'prod'", ] assert get_all_html_output(output) == [ @@ -337,7 +337,7 @@ def test_run_dag( h( "span", {"style": SUCCESS_STYLE}, - "Model batches evaluated successfully", + "✔ Model batches executed successfully", autoescape=False, ), autoescape=False, From 27de7d4ad5a5c4f96d2cc1db85fc8f6f3b32ffdb Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Wed, 26 Mar 2025 09:21:07 -0500 Subject: [PATCH 10/23] Fix magics test --- tests/integrations/jupyter/test_magics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integrations/jupyter/test_magics.py b/tests/integrations/jupyter/test_magics.py index e9822a42b3..e8c14aaef6 100644 --- a/tests/integrations/jupyter/test_magics.py +++ b/tests/integrations/jupyter/test_magics.py @@ -293,13 +293,13 @@ def test_plan( # the models and how long it took assert len(output.stdout.strip().split("\n")) == 46 assert not output.stderr - assert len(output.outputs) == 4 + assert len(output.outputs) == 5 text_output = convert_all_html_output_to_text(output) # TODO: Is this what we expect? # This has minor differences between CI/CD and local. assert "[2K" in text_output[0] assert text_output[1].startswith( - "Virtually updating 'prod' environment views ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%" + "Updating virtual layer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%" ) # TODO: Is this what we expect? assert text_output[2] == "" From ac32a0c14bba290abab608f21d7c6827274f4aea Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Wed, 26 Mar 2025 10:06:37 -0500 Subject: [PATCH 11/23] Replace 'successfully' with check mark --- .circleci/continue_config.yml | 102 +++++++++++----------- sqlmesh/core/console.py | 8 +- tests/cli/test_cli.py | 16 ++-- tests/core/test_context.py | 2 +- tests/integrations/jupyter/test_magics.py | 7 +- 5 files changed, 68 insertions(+), 67 deletions(-) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index 8bc469bd6e..e2518d1235 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -269,7 +269,7 @@ jobs: workflows: main_pr: jobs: - - doc_tests + # - doc_tests - style_and_cicd_tests: matrix: parameters: @@ -278,53 +278,53 @@ workflows: - "3.10" - "3.11" - "3.12" - - airflow_docker_tests: - requires: - - style_and_cicd_tests - filters: - branches: - only: - - main - - engine_tests_docker: - name: engine_<< matrix.engine >> - matrix: - parameters: - engine: - - duckdb - - postgres - - mysql - - mssql - - trino - - spark - - clickhouse - - clickhouse-cluster - - risingwave - - engine_tests_cloud: - name: cloud_engine_<< matrix.engine >> - context: - - sqlmesh_cloud_database_integration - requires: - - engine_tests_docker - matrix: - parameters: - engine: - - snowflake - - databricks - - redshift - - bigquery - - clickhouse-cloud - - athena - filters: - branches: - only: - - main - - trigger_private_tests: - requires: - - style_and_cicd_tests - filters: - branches: - only: - - main - - ui_style - - ui_test - - migration_test + # - airflow_docker_tests: + # requires: + # - style_and_cicd_tests + # filters: + # branches: + # only: + # - main + # - engine_tests_docker: + # name: engine_<< matrix.engine >> + # matrix: + # parameters: + # engine: + # - duckdb + # - postgres + # - mysql + # - mssql + # - trino + # - spark + # - clickhouse + # - clickhouse-cluster + # - risingwave + # - engine_tests_cloud: + # name: cloud_engine_<< matrix.engine >> + # context: + # - sqlmesh_cloud_database_integration + # requires: + # - engine_tests_docker + # matrix: + # parameters: + # engine: + # - snowflake + # - databricks + # - redshift + # - bigquery + # - clickhouse-cloud + # - athena + # filters: + # branches: + # only: + # - main + # - trigger_private_tests: + # requires: + # - style_and_cicd_tests + # filters: + # branches: + # only: + # - main + # - ui_style + # - ui_test + # - migration_test diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 4c8c4f6400..e9dec019c1 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -713,7 +713,7 @@ def stop_evaluation_progress(self, success: bool = True) -> None: if self.evaluation_progress_live: self.evaluation_progress_live.stop() if success: - self.log_success(f"{GREEN_CHECK_MARK} Model batches executed successfully") + self.log_success(f"{GREEN_CHECK_MARK} Model batches executed") self.evaluation_progress_live = None self.evaluation_total_progress = None @@ -760,7 +760,7 @@ def stop_creation_progress(self, success: bool = True) -> None: self.creation_progress.stop() self.creation_progress = None if success: - self.log_success(f"\n{GREEN_CHECK_MARK} Physical layer updated successfully") + self.log_success(f"\n{GREEN_CHECK_MARK} Physical layer updated") self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -801,7 +801,7 @@ def start_promotion_progress( if self.promotion_progress is None: self.promotion_progress = Progress( TextColumn( - "[bold blue]Updating virtual layer", + "[bold blue]Updating virtual layer ", # space to align with other progress bars justify="right", ), BarColumn(bar_width=PROGRESS_BAR_WIDTH), @@ -838,7 +838,7 @@ def stop_promotion_progress(self, success: bool = True) -> None: self.promotion_progress.stop() self.promotion_progress = None if success: - self.log_success(f"\n{GREEN_CHECK_MARK} Virtual layer updated successfully") + self.log_success(f"\n{GREEN_CHECK_MARK} Virtual layer updated") self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 82fedb201f..b94d92cd76 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -129,15 +129,15 @@ def assert_new_env(result, new_env="prod", from_env="prod", initialize=True) -> def assert_model_versions_created(result) -> None: - assert "Physical layer updated successfully" in result.output + assert "Physical layer updated" in result.output def assert_model_batches_evaluated(result) -> None: - assert "Model batches executed successfully" in result.output + assert "Model batches executed" in result.output def assert_env_views_updated(result) -> None: - assert "Virtual layer updated successfully" in result.output + assert "Virtual layer updated" in result.output def assert_backfill_success(result) -> None: @@ -154,7 +154,7 @@ def assert_plan_success(result, new_env="prod", from_env="prod") -> None: def assert_virtual_update(result) -> None: - assert "Virtual Update executed successfully" in result.output + assert "Virtual Update executed" in result.output def test_version(runner, tmp_path): @@ -268,7 +268,7 @@ def test_plan_skip_backfill(runner, tmp_path, flag): ) assert result.exit_code == 0 assert_virtual_update(result) - assert "Model batches executed successfully" not in result.output + assert "Model batches executed" not in result.output def test_plan_auto_apply(runner, tmp_path): @@ -495,9 +495,9 @@ def test_plan_dev_no_prompts(runner, tmp_path): cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "dev", "--no-prompts"] ) assert "Apply - Backfill Tables [y/n]: " in result.output - assert "Physical layer updated successfully" not in result.output - assert "Model batches executed successfully" not in result.output - assert "The target environment has been updated successfully" not in result.output + assert "Physical layer updated" not in result.output + assert "Model batches executed" not in result.output + assert "The target environment has been updated" not in result.output def test_plan_dev_auto_apply(runner, tmp_path): diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 5f80c88082..a2b7c56005 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -1403,7 +1403,7 @@ def test_plan_runs_audits_on_dev_previews(sushi_context: Context, capsys, caplog log = caplog.text assert "'not_null' audit error:" in log assert "'at_least_one_non_blocking' audit error:" in log - assert "Virtual layer updated successfully" in stdout + assert "Virtual layer updated" in stdout def test_environment_statements(tmp_path: pathlib.Path): diff --git a/tests/integrations/jupyter/test_magics.py b/tests/integrations/jupyter/test_magics.py index e8c14aaef6..bbb0eeb391 100644 --- a/tests/integrations/jupyter/test_magics.py +++ b/tests/integrations/jupyter/test_magics.py @@ -303,12 +303,13 @@ def test_plan( ) # TODO: Is this what we expect? assert text_output[2] == "" - assert text_output[3] == "Virtual layer updated successfully" + assert text_output[3] == "✔ Virtual layer updated" assert convert_all_html_output_to_tags(output) == [ ["pre", "span"], ["pre"] + ["span"] * 4, ["pre"], ["pre", "span"], + ["pre"] + ["span"] * 9, ] @@ -326,7 +327,7 @@ def test_run_dag( assert not output.stderr assert len(output.outputs) == 2 assert convert_all_html_output_to_text(output) == [ - "✔ Model batches executed successfully", + "✔ Model batches executed", "Run finished for environment 'prod'", ] assert get_all_html_output(output) == [ @@ -337,7 +338,7 @@ def test_run_dag( h( "span", {"style": SUCCESS_STYLE}, - "✔ Model batches executed successfully", + "✔ Model batches executed", autoescape=False, ), autoescape=False, From c21af20aa0c471d5bafc9b7ad65054bfe347ed2d Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 27 Mar 2025 11:22:52 -0500 Subject: [PATCH 12/23] Tidy up --- sqlmesh/core/console.py | 7 +++---- sqlmesh/core/scheduler.py | 2 +- tests/cli/test_cli.py | 24 ++++++++++++------------ web/server/console.py | 8 ++++---- 4 files changed, 20 insertions(+), 21 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index e9dec019c1..d448549be2 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -620,7 +620,7 @@ def start_evaluation_progress( self.evaluation_progress_live.start() self.evaluation_total_task = self.evaluation_total_progress.add_task( - "Evaluating models...", total=sum(batch_sizes.values()) + "Executing models...", total=sum(batch_sizes.values()) ) self.evaluation_model_batch_sizes = batch_sizes @@ -825,7 +825,7 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) if self.promotion_progress is not None and self.promotion_task is not None: if self.verbosity >= Verbosity.VERBOSE: check_mark = f"{GREEN_CHECK_MARK} " if promoted else " " - action_str = "[green]updated[/green]" if promoted else "[yellow]demoted[/yellow]" + action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" self.promotion_progress.live.console.print( f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}" ) @@ -2628,8 +2628,7 @@ def show_row_diff( self._write(row_diff) -# _CONSOLE: Console = NoopConsole() -_CONSOLE: Console = TerminalConsole() +_CONSOLE: Console = NoopConsole() def set_console(console: Console) -> None: diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 56339a26dc..0b740992af 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -478,7 +478,7 @@ def evaluate_node(node: SchedulingUnit) -> None: try: assert execution_time # mypy assert deployability_index # mypy - audit_results = [] + audit_results = [] # so it exists for finally if `evaluate` raises audit_results = self.evaluate( snapshot=snapshot, start=start, diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index b94d92cd76..3d213b1988 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -128,22 +128,22 @@ def assert_new_env(result, new_env="prod", from_env="prod", initialize=True) -> ) in result.output -def assert_model_versions_created(result) -> None: +def assert_physical_layer_updated(result) -> None: assert "Physical layer updated" in result.output -def assert_model_batches_evaluated(result) -> None: +def assert_model_batches_executed(result) -> None: assert "Model batches executed" in result.output -def assert_env_views_updated(result) -> None: +def assert_virtual_layer_updated(result) -> None: assert "Virtual layer updated" in result.output def assert_backfill_success(result) -> None: - assert_model_versions_created(result) - assert_model_batches_evaluated(result) - assert_env_views_updated(result) + assert_physical_layer_updated(result) + assert_model_batches_executed(result) + assert_virtual_layer_updated(result) def assert_plan_success(result, new_env="prod", from_env="prod") -> None: @@ -243,8 +243,8 @@ def test_plan_restate_model(runner, tmp_path): assert_duckdb_test(result) assert "No changes to plan: project files match the `prod` environment" in result.output assert "sqlmesh_example.full_model [full refresh" in result.output - assert_model_batches_evaluated(result) - assert_env_views_updated(result) + assert_model_batches_executed(result) + assert_virtual_layer_updated(result) @pytest.mark.parametrize("flag", ["--skip-backfill", "--dry-run"]) @@ -396,7 +396,7 @@ def test_plan_dev_create_from_virtual(runner, tmp_path): ) assert result.exit_code == 0 assert_new_env(result, "dev2", "dev", initialize=False) - assert_env_views_updated(result) + assert_virtual_layer_updated(result) assert_virtual_update(result) @@ -533,7 +533,7 @@ def test_plan_dev_no_changes(runner, tmp_path): ) assert result.exit_code == 0 assert_new_env(result, "dev", initialize=False) - assert_env_views_updated(result) + assert_virtual_layer_updated(result) assert_virtual_update(result) @@ -718,7 +718,7 @@ def test_run_dev(runner, tmp_path, flag): # Confirm backfill occurs when we run non-backfilled dev env result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run", "dev"]) assert result.exit_code == 0 - assert_model_batches_evaluated(result) + assert_model_batches_executed(result) @time_machine.travel(FREEZE_TIME) @@ -750,7 +750,7 @@ def test_run_cron_elapsed(runner, tmp_path): result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run"]) assert result.exit_code == 0 - assert_model_batches_evaluated(result) + assert_model_batches_executed(result) def test_clean(runner, tmp_path): diff --git a/web/server/console.py b/web/server/console.py index df35e514ff..9deb9e1137 100644 --- a/web/server/console.py +++ b/web/server/console.py @@ -91,7 +91,7 @@ def stop_restate_progress(self, success: bool) -> None: def start_evaluation_progress( self, - batched_intervals: t.Dict[Snapshot, int], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -104,7 +104,7 @@ def start_evaluation_progress( name=snapshot.name, view_name=snapshot.display_name(environment_naming_info, default_catalog), ) - for snapshot, total_tasks in batched_intervals.items() + for snapshot, total_tasks in batch_sizes.items() } self.plan_apply_stage_tracker.add_stage( models.PlanStage.backfill, @@ -128,8 +128,8 @@ def update_snapshot_evaluation_progress( interval: Interval, batch_idx: int, duration_ms: t.Optional[int], - audits_passed: int, - audits_failed: int, + num_audits_passed: int, + num_audits_failed: int, ) -> None: if self.plan_apply_stage_tracker and self.plan_apply_stage_tracker.backfill: task = self.plan_apply_stage_tracker.backfill.tasks[snapshot.name] From a7a8200a3bc2109e4361927a3016e2bdc7fe6010 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 27 Mar 2025 11:24:41 -0500 Subject: [PATCH 13/23] Revert plan summary string --- sqlmesh/core/plan/evaluator.py | 35 ---------------------------------- 1 file changed, 35 deletions(-) diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index e34b4c4398..5b01db5ff7 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -160,41 +160,6 @@ def evaluate( if not plan.requires_backfill: self.console.log_success("Virtual Update executed successfully") - model_kind_counts: t.Dict[str, int] = {} - audit_counts: t.Dict[str, int] = {} - for snapshot in snapshots.values(): - if snapshot.name in all_names: - if snapshot.is_audit: - audit_counts["standalone"] = audit_counts.get("standalone", 0) + 1 - if ( - snapshot.is_model - and snapshot.model_kind_name - and snapshot.model.kind - and not snapshot.model.kind.is_external - and not snapshot.model.kind.is_embedded - ): - kind_name = snapshot.model_kind_name - model_kind_counts[kind_name] = model_kind_counts.get(kind_name, 0) + 1 - if snapshot.is_model and snapshot.model.audits: - if snapshot.model.kind.is_external: - audit_counts["EXTERNAL model"] = audit_counts.get( - "EXTERNAL model", 0 - ) + len(snapshot.model.audits) - else: - audit_counts["model"] = audit_counts.get("model", 0) + len( - snapshot.model.audits - ) - - summary_str = ", ".join( - [f"{v} {k} model{'s' if v > 1 else ''}" for k, v in model_kind_counts.items()] - ) - for audit_type in ["EXTERNAL model", "model", "standalone"]: - if audit_type in audit_counts: - count = audit_counts[audit_type] - summary_str += f", {count} {audit_type} audit{'s' if count > 1 else ''}" - if summary_str: - self.console.log_status_update(f"Plan applied for {summary_str}") - execute_environment_statements( adapter=self.snapshot_evaluator.adapter, environment_statements=plan.environment_statements or [], From 238a7e72c79bf6f6b6b4fa0fad4bea492ee2bcdd Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 27 Mar 2025 11:46:08 -0500 Subject: [PATCH 14/23] fix tests --- tests/cli/test_cli.py | 2 +- tests/integrations/jupyter/test_magics.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 3d213b1988..37244301cc 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -294,7 +294,7 @@ def test_plan_verbose(runner, tmp_path): ) assert_plan_success(result) assert "sqlmesh_example.seed_model created" in result.output - assert "sqlmesh_example.seed_model updated" in result.output + assert "sqlmesh_example.seed_model promoted" in result.output def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path): diff --git a/tests/integrations/jupyter/test_magics.py b/tests/integrations/jupyter/test_magics.py index bbb0eeb391..f4267a449a 100644 --- a/tests/integrations/jupyter/test_magics.py +++ b/tests/integrations/jupyter/test_magics.py @@ -293,7 +293,7 @@ def test_plan( # the models and how long it took assert len(output.stdout.strip().split("\n")) == 46 assert not output.stderr - assert len(output.outputs) == 5 + assert len(output.outputs) == 4 text_output = convert_all_html_output_to_text(output) # TODO: Is this what we expect? # This has minor differences between CI/CD and local. @@ -309,7 +309,6 @@ def test_plan( ["pre"] + ["span"] * 4, ["pre"], ["pre", "span"], - ["pre"] + ["span"] * 9, ] From 087673ded3637709597cce00dacb6f17243fa941 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 27 Mar 2025 17:12:35 -0500 Subject: [PATCH 15/23] Re-enable CI tests --- .circleci/continue_config.yml | 102 +++++++++++++++++----------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index e2518d1235..8bc469bd6e 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -269,7 +269,7 @@ jobs: workflows: main_pr: jobs: - # - doc_tests + - doc_tests - style_and_cicd_tests: matrix: parameters: @@ -278,53 +278,53 @@ workflows: - "3.10" - "3.11" - "3.12" - # - airflow_docker_tests: - # requires: - # - style_and_cicd_tests - # filters: - # branches: - # only: - # - main - # - engine_tests_docker: - # name: engine_<< matrix.engine >> - # matrix: - # parameters: - # engine: - # - duckdb - # - postgres - # - mysql - # - mssql - # - trino - # - spark - # - clickhouse - # - clickhouse-cluster - # - risingwave - # - engine_tests_cloud: - # name: cloud_engine_<< matrix.engine >> - # context: - # - sqlmesh_cloud_database_integration - # requires: - # - engine_tests_docker - # matrix: - # parameters: - # engine: - # - snowflake - # - databricks - # - redshift - # - bigquery - # - clickhouse-cloud - # - athena - # filters: - # branches: - # only: - # - main - # - trigger_private_tests: - # requires: - # - style_and_cicd_tests - # filters: - # branches: - # only: - # - main - # - ui_style - # - ui_test - # - migration_test + - airflow_docker_tests: + requires: + - style_and_cicd_tests + filters: + branches: + only: + - main + - engine_tests_docker: + name: engine_<< matrix.engine >> + matrix: + parameters: + engine: + - duckdb + - postgres + - mysql + - mssql + - trino + - spark + - clickhouse + - clickhouse-cluster + - risingwave + - engine_tests_cloud: + name: cloud_engine_<< matrix.engine >> + context: + - sqlmesh_cloud_database_integration + requires: + - engine_tests_docker + matrix: + parameters: + engine: + - snowflake + - databricks + - redshift + - bigquery + - clickhouse-cloud + - athena + filters: + branches: + only: + - main + - trigger_private_tests: + requires: + - style_and_cicd_tests + filters: + branches: + only: + - main + - ui_style + - ui_test + - migration_test From 2c8f97317b985bcbc22f151ab5b07e39e4ecfca5 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Fri, 28 Mar 2025 10:23:18 -0500 Subject: [PATCH 16/23] PR feedback --- sqlmesh/core/scheduler.py | 36 ++++++++++++++++++++---------------- tests/core/test_scheduler.py | 2 -- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 0b740992af..00dfc2454e 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -166,10 +166,8 @@ def evaluate( execution_time: TimeLike, deployability_index: DeployabilityIndex, batch_index: int, - environment_naming_info: EnvironmentNamingInfo, - default_catalog: t.Optional[str], **kwargs: t.Any, - ) -> t.List[AuditResult]: + ) -> t.Tuple[t.List[AuditResult], t.List[AuditError]]: """Evaluate a snapshot and add the processed interval to the state sync. Args: @@ -183,7 +181,7 @@ def evaluate( kwargs: Additional kwargs to pass to the renderer. Returns: - List of audit results from the evaluation. + Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn. """ validate_date_range(start, end) @@ -213,6 +211,7 @@ def evaluate( ) audit_errors_to_raise: t.List[AuditError] = [] + audit_errors_to_warn: t.List[AuditError] = [] for audit_result in (result for result in audit_results if result.count): error = AuditError( audit_name=audit_result.audit.name, @@ -230,21 +229,13 @@ def evaluate( if audit_result.blocking: audit_errors_to_raise.append(error) else: - display_name = snapshot.display_name( - environment_naming_info, - default_catalog, - self.snapshot_evaluator.adapter.dialect, - ) - self.console.log_warning( - f"\n{display_name}: {error}.", - f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}", - ) + audit_errors_to_warn.append(error) if audit_errors_to_raise: raise NodeAuditsErrors(audit_errors_to_raise) self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable) - return audit_results + return audit_results, audit_errors_to_warn def run( self, @@ -475,11 +466,12 @@ def evaluate_node(node: SchedulingUnit) -> None: execution_start_ts = now_timestamp() evaluation_duration_ms: t.Optional[int] = None + audit_results: t.List[AuditResult] = [] + audit_errors_to_warn: t.List[AuditError] = [] try: assert execution_time # mypy assert deployability_index # mypy - audit_results = [] # so it exists for finally if `evaluate` raises - audit_results = self.evaluate( + audit_results, audit_errors_to_warn = self.evaluate( snapshot=snapshot, start=start, end=end, @@ -489,6 +481,18 @@ def evaluate_node(node: SchedulingUnit) -> None: environment_naming_info=environment_naming_info, default_catalog=self.default_catalog, ) + + for audit_error in audit_errors_to_warn: + display_name = snapshot.display_name( + environment_naming_info, + self.default_catalog, + self.snapshot_evaluator.adapter.dialect, + ) + self.console.log_warning( + f"\n{display_name}: {audit_error}.", + f"{audit_error}. Audit query:\n{audit_error.query.sql(audit_error.adapter_dialect)}", + ) + evaluation_duration_ms = now_timestamp() - execution_start_ts finally: num_audits = len(audit_results) diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index dbe62f8a44..cfe3bf52bb 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -528,8 +528,6 @@ def _evaluate(): to_datetime("2022-01-03"), DeployabilityIndex.all_deployable(), 0, - EnvironmentNamingInfo(), - None, ) evaluator_audit_mock.return_value = [ From 0d038fa8e7c66faf5231e41a1c6141187ae2d36d Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Fri, 28 Mar 2025 16:35:15 -0500 Subject: [PATCH 17/23] Fix virtual layer prog bar label --- sqlmesh/core/console.py | 31 +++++++++++++------------------ sqlmesh/core/scheduler.py | 2 -- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index d448549be2..fe197ba45d 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -516,9 +516,13 @@ def show_linter_violations( pass -def make_progress_bar(message: str, console: t.Optional[RichConsole] = None) -> Progress: +def make_progress_bar( + message: str, + console: t.Optional[RichConsole] = None, + justify: t.Literal["default", "left", "center", "right", "full"] = "right", +) -> Progress: return Progress( - TextColumn(f"[bold blue]{message}", justify="right"), + TextColumn(f"[bold blue]{message}", justify=justify), BarColumn(bar_width=PROGRESS_BAR_WIDTH), "[progress.percentage]{task.percentage:>3.1f}%", "•", @@ -696,7 +700,7 @@ def update_snapshot_evaluation_progress( ) self.evaluation_progress_live.console.print( - f"{GREEN_CHECK_MARK} {batch}{display_name}{annotation} {duration}" + f"{batch}{display_name}{annotation} {duration}" ) self.evaluation_total_progress.update( @@ -749,7 +753,7 @@ def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: if self.creation_progress is not None and self.creation_task is not None: if self.verbosity >= Verbosity.VERBOSE: self.creation_progress.live.console.print( - f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} [green]created[/green]" + f"{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} [green]created[/green]" ) self.creation_progress.update(self.creation_task, refresh=True, advance=1) @@ -799,16 +803,8 @@ def start_promotion_progress( ) -> None: """Indicates that a new snapshot promotion progress has begun.""" if self.promotion_progress is None: - self.promotion_progress = Progress( - TextColumn( - "[bold blue]Updating virtual layer ", # space to align with other progress bars - justify="right", - ), - BarColumn(bar_width=PROGRESS_BAR_WIDTH), - "[progress.percentage]{task.percentage:>3.1f}%", - "•", - TimeElapsedColumn(), - console=self.console, + self.promotion_progress = make_progress_bar( + "Updating virtual layer ", self.console, justify="left" ) self.promotion_progress.start() @@ -824,10 +820,9 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) """Update the snapshot promotion progress.""" if self.promotion_progress is not None and self.promotion_task is not None: if self.verbosity >= Verbosity.VERBOSE: - check_mark = f"{GREEN_CHECK_MARK} " if promoted else " " action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" self.promotion_progress.live.console.print( - f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}" + f"{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}" ) self.promotion_progress.update(self.promotion_task, refresh=True, advance=1) @@ -2759,8 +2754,8 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) -> def _justify_evaluation_model_info( text: str, length: int, - justify_direction: str = "left", - dots_side: str = "left", + justify_direction: t.Literal["left", "right"] = "left", + dots_side: t.Literal["left", "right"] = "left", prefix: str = "", suffix: str = "", ) -> str: diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 00dfc2454e..0052cb785b 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -478,8 +478,6 @@ def evaluate_node(node: SchedulingUnit) -> None: execution_time=execution_time, deployability_index=deployability_index, batch_index=batch_idx, - environment_naming_info=environment_naming_info, - default_catalog=self.default_catalog, ) for audit_error in audit_errors_to_warn: From 361bb480cfcc846c8c16a8bdbc5866d6de54aab4 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Fri, 28 Mar 2025 17:02:00 -0500 Subject: [PATCH 18/23] Fix magics test --- tests/integrations/jupyter/test_magics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integrations/jupyter/test_magics.py b/tests/integrations/jupyter/test_magics.py index f4267a449a..9519668ab3 100644 --- a/tests/integrations/jupyter/test_magics.py +++ b/tests/integrations/jupyter/test_magics.py @@ -299,14 +299,14 @@ def test_plan( # This has minor differences between CI/CD and local. assert "[2K" in text_output[0] assert text_output[1].startswith( - "Updating virtual layer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%" + "Updating virtual layer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%" ) # TODO: Is this what we expect? assert text_output[2] == "" assert text_output[3] == "✔ Virtual layer updated" assert convert_all_html_output_to_tags(output) == [ ["pre", "span"], - ["pre"] + ["span"] * 4, + ["pre"] + ["span"] * 5, ["pre"], ["pre", "span"], ] From df3c363d32c29897a0f53f5941e985954d46139d Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Mon, 31 Mar 2025 10:11:10 -0500 Subject: [PATCH 19/23] Move 2 spaces from check marks to model name column --- sqlmesh/core/console.py | 2 +- tests/cli/test_cli.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index fe197ba45d..443b230a67 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -540,7 +540,7 @@ class TerminalConsole(Console): PROGRESS_BAR_COLUMN_WIDTHS: t.Dict[str, int] = { "batch": 7, - "name": 50, + "name": 52, "annotation": 50, "duration": 8, } diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 37244301cc..518474056c 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -242,7 +242,7 @@ def test_plan_restate_model(runner, tmp_path): assert result.exit_code == 0 assert_duckdb_test(result) assert "No changes to plan: project files match the `prod` environment" in result.output - assert "sqlmesh_example.full_model [full refresh" in result.output + assert "sqlmesh_example.full_model [full refresh" in result.output assert_model_batches_executed(result) assert_virtual_layer_updated(result) @@ -610,8 +610,8 @@ def test_plan_breaking(runner, tmp_path): assert result.exit_code == 0 assert "+ item_id + 1 AS item_id," in result.output assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output - assert "sqlmesh_example.full_model [full refresh" in result.output - assert "sqlmesh_example.incremental_model [insert" not in result.output + assert "sqlmesh_example.full_model [full refresh" in result.output + assert "sqlmesh_example.incremental_model [insert" not in result.output assert_backfill_success(result) From 349279e2377b687a72d2a7b0e258d6ceb3b38c45 Mon Sep 17 00:00:00 2001 From: Trey Spiller <1831878+treysp@users.noreply.github.com> Date: Mon, 31 Mar 2025 10:30:21 -0500 Subject: [PATCH 20/23] Update sqlmesh/core/console.py Co-authored-by: Sung Won Chung --- sqlmesh/core/console.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 443b230a67..1c3564857f 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -2803,6 +2803,6 @@ def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Opt if snapshot.model.kind.is_incremental_by_unique_key: return "insert or update rows" if snapshot.model.kind.is_incremental_by_partition: - return "insert partition" + return "insert partitions" return interval_info if interval_info else "" From cbaca25772b5cc188a8732d094c5c90f1a9d3754 Mon Sep 17 00:00:00 2001 From: Trey Spiller <1831878+treysp@users.noreply.github.com> Date: Mon, 31 Mar 2025 10:30:29 -0500 Subject: [PATCH 21/23] Update sqlmesh/core/console.py Co-authored-by: Sung Won Chung --- sqlmesh/core/console.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 1c3564857f..9dc8da8b01 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -2801,7 +2801,7 @@ def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Opt if snapshot.model.kind.is_view: return "recreate view" if snapshot.model.kind.is_incremental_by_unique_key: - return "insert or update rows" + return "insert/update rows" if snapshot.model.kind.is_incremental_by_partition: return "insert partitions" From 6f09f8d36749dfa99f332b4faa7b2fe21e45add2 Mon Sep 17 00:00:00 2001 From: Trey Spiller <1831878+treysp@users.noreply.github.com> Date: Mon, 31 Mar 2025 10:30:38 -0500 Subject: [PATCH 22/23] Update sqlmesh/core/console.py Co-authored-by: Sung Won Chung --- sqlmesh/core/console.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 9dc8da8b01..1c342c6354 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -2793,7 +2793,7 @@ def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Opt if snapshot.is_audit: return "run standalone audit" if snapshot.is_model and snapshot.model.kind.is_external: - return "run external model audits" + return "run external audits" if snapshot.model.kind.is_seed: return "insert from seed file" if snapshot.model.kind.is_full: From ef04ff790654e4df739e53526bfec555714fb1e6 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Mon, 31 Mar 2025 13:31:49 -0500 Subject: [PATCH 23/23] Align virtual prog bar labels --- sqlmesh/core/console.py | 6 ++++-- tests/cli/test_cli.py | 14 +++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 1c342c6354..1c267bd6fb 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -820,7 +820,9 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) """Update the snapshot promotion progress.""" if self.promotion_progress is not None and self.promotion_task is not None: if self.verbosity >= Verbosity.VERBOSE: - action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" + action_str = ( + "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" + ).ljust(len("promoted")) self.promotion_progress.live.console.print( f"{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}" ) @@ -2795,7 +2797,7 @@ def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Opt if snapshot.is_model and snapshot.model.kind.is_external: return "run external audits" if snapshot.model.kind.is_seed: - return "insert from seed file" + return "insert seed file" if snapshot.model.kind.is_full: return "full refresh" if snapshot.model.kind.is_view: diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 518474056c..fdcdf4486b 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -293,8 +293,8 @@ def test_plan_verbose(runner, tmp_path): cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "--verbose"], input="y\n" ) assert_plan_success(result) - assert "sqlmesh_example.seed_model created" in result.output - assert "sqlmesh_example.seed_model promoted" in result.output + assert "sqlmesh_example.seed_model created" in result.output + assert "sqlmesh_example.seed_model promoted" in result.output def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path): @@ -552,7 +552,7 @@ def test_plan_nonbreaking(runner, tmp_path): assert "+ 'a' AS new_col" in result.output assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output - assert "sqlmesh_example.incremental_model [insert" in result.output + assert "sqlmesh_example.incremental_model [insert" in result.output assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output assert_backfill_success(result) @@ -649,8 +649,8 @@ def test_plan_dev_select(runner, tmp_path): assert "+ item_id + 1 AS item_id," not in result.output assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output # only incremental_model backfilled - assert "sqlmesh_example__dev.incremental_model [insert" in result.output - assert "sqlmesh_example__dev.full_model [full refresh" not in result.output + assert "sqlmesh_example__dev.incremental_model [insert" in result.output + assert "sqlmesh_example__dev.full_model [full refresh" not in result.output assert_backfill_success(result) @@ -688,8 +688,8 @@ def test_plan_dev_backfill(runner, tmp_path): "Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output ) # only incremental_model backfilled - assert "sqlmesh_example__dev.incremental_model [insert" in result.output - assert "sqlmesh_example__dev.full_model [full refresh" not in result.output + assert "sqlmesh_example__dev.incremental_model [insert" in result.output + assert "sqlmesh_example__dev.full_model [full refresh" not in result.output assert_backfill_success(result)