diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index c1642ccd43..9b2250ab1a 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -147,7 +147,7 @@ class Config(BaseConfig): } _connection_config_validator = connection_config_validator - _scheduler_config_validator = scheduler_config_validator + _scheduler_config_validator = scheduler_config_validator # type: ignore _variables_validator = variables_validator @field_validator("gateways", mode="before") diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index c1dd42d78e..1c267bd6fb 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -24,6 +24,7 @@ from rich.syntax import Syntax from rich.table import Table from rich.tree import Tree +from sqlglot import exp from sqlmesh.core.environment import EnvironmentNamingInfo from sqlmesh.core.linter.rule import RuleViolation @@ -34,11 +35,12 @@ SnapshotId, SnapshotInfoLike, ) +from sqlmesh.core.snapshot.definition import Interval from sqlmesh.core.test import ModelTest from sqlmesh.utils import rich as srich from sqlmesh.utils import Verbosity from sqlmesh.utils.concurrency import NodeExecutionFailedError -from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds +from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds, to_ds, to_datetime from sqlmesh.utils.errors import ( PythonModelEvalError, NodeAuditsErrors, @@ -72,6 +74,9 @@ PROGRESS_BAR_WIDTH = 40 LINE_WRAP_WIDTH = 100 +CHECK_MARK = "\u2714" +GREEN_CHECK_MARK = f"[green]{CHECK_MARK}[/green]" +RED_X_MARK = "\u274c" class Console(abc.ABC): @@ -91,7 +96,7 @@ def stop_plan_evaluation(self) -> None: @abc.abstractmethod def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -103,7 +108,13 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: @abc.abstractmethod def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + interval: Interval, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: """Updates the snapshot evaluation progress.""" @@ -341,7 +352,7 @@ def stop_plan_evaluation(self) -> None: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -351,7 +362,13 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: pass def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + interval: Interval, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: pass @@ -499,9 +516,13 @@ def show_linter_violations( pass -def make_progress_bar(message: str, console: t.Optional[RichConsole] = None) -> Progress: +def make_progress_bar( + message: str, + console: t.Optional[RichConsole] = None, + justify: t.Literal["default", "left", "center", "right", "full"] = "right", +) -> Progress: return Progress( - TextColumn(f"[bold blue]{message}", justify="right"), + TextColumn(f"[bold blue]{message}", justify=justify), BarColumn(bar_width=PROGRESS_BAR_WIDTH), "[progress.percentage]{task.percentage:>3.1f}%", "•", @@ -517,6 +538,13 @@ class TerminalConsole(Console): TABLE_DIFF_SOURCE_BLUE = "#0248ff" + PROGRESS_BAR_COLUMN_WIDTHS: t.Dict[str, int] = { + "batch": 7, + "name": 52, + "annotation": 50, + "duration": 8, + } + def __init__( self, console: t.Optional[RichConsole] = None, @@ -532,7 +560,6 @@ def __init__( self.evaluation_total_task: t.Optional[TaskID] = None self.evaluation_model_progress: t.Optional[Progress] = None self.evaluation_model_tasks: t.Dict[str, TaskID] = {} - self.evaluation_model_batches: t.Dict[Snapshot, int] = {} # Put in temporary values that are replaced when evaluating self.environment_naming_info = EnvironmentNamingInfo() @@ -573,13 +600,15 @@ def stop_plan_evaluation(self) -> None: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: """Indicates that a new snapshot evaluation progress has begun.""" if not self.evaluation_progress_live: - self.evaluation_total_progress = make_progress_bar("Evaluating models", self.console) + self.evaluation_total_progress = make_progress_bar( + "Executing model batches", self.console + ) self.evaluation_model_progress = Progress( TextColumn("{task.fields[view_name]}", justify="right"), @@ -595,26 +624,34 @@ def start_evaluation_progress( self.evaluation_progress_live.start() self.evaluation_total_task = self.evaluation_total_progress.add_task( - "Evaluating models...", total=sum(batches.values()) + "Executing models...", total=sum(batch_sizes.values()) ) - self.evaluation_model_batches = batches + self.evaluation_model_batch_sizes = batch_sizes self.environment_naming_info = environment_naming_info self.default_catalog = default_catalog def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks: display_name = snapshot.display_name( - self.environment_naming_info, self.default_catalog, dialect=self.dialect + self.environment_naming_info, + self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task( f"Evaluating {display_name}...", view_name=display_name, - total=self.evaluation_model_batches[snapshot], + total=self.evaluation_model_batch_sizes[snapshot], ) def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + interval: Interval, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: """Update the snapshot evaluation progress.""" if ( @@ -622,11 +659,48 @@ def update_snapshot_evaluation_progress( and self.evaluation_model_progress and self.evaluation_progress_live ): - total_batches = self.evaluation_model_batches[snapshot] + total_batches = self.evaluation_model_batch_sizes[snapshot] + batch_num = str(batch_idx + 1).rjust(len(str(total_batches))) + batch = f"[{batch_num}/{total_batches}]".ljust(self.PROGRESS_BAR_COLUMN_WIDTHS["batch"]) if duration_ms: + display_name = _justify_evaluation_model_info( + snapshot.display_name( + self.environment_naming_info, + self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, + ), + self.PROGRESS_BAR_COLUMN_WIDTHS["name"], + ) + + annotation = _create_evaluation_model_annotation( + snapshot, _format_evaluation_model_interval(snapshot, interval) + ) + audits_str = "" + if num_audits_passed: + audits_str += f" {CHECK_MARK}{num_audits_passed}" + if num_audits_failed: + audits_str += f" {RED_X_MARK}{num_audits_failed}" + audits_str = f", audits{audits_str}" if audits_str else "" + + annotation_width = self.PROGRESS_BAR_COLUMN_WIDTHS["annotation"] + annotation = _justify_evaluation_model_info( + annotation + audits_str, + annotation_width + if not num_audits_failed + else annotation_width - 1, # -1 for RED_X_MARK's extra space + dots_side="right", + prefix=" \[", + suffix="]", + ) + annotation = annotation.replace(CHECK_MARK, GREEN_CHECK_MARK) + + duration = f"{(duration_ms / 1000.0):.2f}s".ljust( + self.PROGRESS_BAR_COLUMN_WIDTHS["duration"] + ) + self.evaluation_progress_live.console.print( - f"[{batch_idx + 1}/{total_batches}] {snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} [green]evaluated[/green] in {(duration_ms / 1000.0):.2f}s" + f"{batch}{display_name}{annotation} {duration}" ) self.evaluation_total_progress.update( @@ -643,14 +717,14 @@ def stop_evaluation_progress(self, success: bool = True) -> None: if self.evaluation_progress_live: self.evaluation_progress_live.stop() if success: - self.log_success("Model batches executed successfully") + self.log_success(f"{GREEN_CHECK_MARK} Model batches executed") self.evaluation_progress_live = None self.evaluation_total_progress = None self.evaluation_total_task = None self.evaluation_model_progress = None self.evaluation_model_tasks = {} - self.evaluation_model_batches = {} + self.evaluation_model_batch_sizes = {} self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -662,12 +736,12 @@ def start_creation_progress( ) -> None: """Indicates that a new creation progress has begun.""" if self.creation_progress is None: - message = "Creating physical table" if total_tasks == 1 else "Creating physical tables" - self.creation_progress = make_progress_bar(message, self.console) + self.creation_progress = make_progress_bar("Updating physical layer", self.console) + self._print("") self.creation_progress.start() self.creation_task = self.creation_progress.add_task( - "Creating physical tables...", + "Updating physical layer...", total=total_tasks, ) @@ -679,7 +753,7 @@ def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: if self.creation_progress is not None and self.creation_task is not None: if self.verbosity >= Verbosity.VERBOSE: self.creation_progress.live.console.print( - f"{snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} [green]created[/green]" + f"{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} [green]created[/green]" ) self.creation_progress.update(self.creation_task, refresh=True, advance=1) @@ -690,7 +764,7 @@ def stop_creation_progress(self, success: bool = True) -> None: self.creation_progress.stop() self.creation_progress = None if success: - self.log_success("Model versions created successfully") + self.log_success(f"\n{GREEN_CHECK_MARK} Physical layer updated") self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -729,21 +803,13 @@ def start_promotion_progress( ) -> None: """Indicates that a new snapshot promotion progress has begun.""" if self.promotion_progress is None: - self.promotion_progress = Progress( - TextColumn( - f"[bold blue]Virtually Updating '{environment_naming_info.name}'", - justify="right", - ), - BarColumn(bar_width=PROGRESS_BAR_WIDTH), - "[progress.percentage]{task.percentage:>3.1f}%", - "•", - TimeElapsedColumn(), - console=self.console, + self.promotion_progress = make_progress_bar( + "Updating virtual layer ", self.console, justify="left" ) self.promotion_progress.start() self.promotion_task = self.promotion_progress.add_task( - f"Virtually Updating {environment_naming_info.name}...", + f"Virtually updating {environment_naming_info.name}...", total=total_tasks, ) @@ -754,9 +820,11 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) """Update the snapshot promotion progress.""" if self.promotion_progress is not None and self.promotion_task is not None: if self.verbosity >= Verbosity.VERBOSE: - action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" + action_str = ( + "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" + ).ljust(len("promoted")) self.promotion_progress.live.console.print( - f"{snapshot.display_name(self.environment_naming_info, self.default_catalog, dialect=self.dialect)} {action_str}" + f"{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}" ) self.promotion_progress.update(self.promotion_task, refresh=True, advance=1) @@ -767,7 +835,7 @@ def stop_promotion_progress(self, success: bool = True) -> None: self.promotion_progress.stop() self.promotion_progress = None if success: - self.log_success("Target environment updated successfully") + self.log_success(f"\n{GREEN_CHECK_MARK} Virtual layer updated") self.environment_naming_info = EnvironmentNamingInfo() self.default_catalog = None @@ -973,7 +1041,7 @@ def _show_summary_tree_for( for s_id in sorted(added_snapshot_ids): snapshot = context_diff.snapshots[s_id] added_tree.add( - f"[added]{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[added]{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) tree.add(self._limit_model_names(added_tree, self.verbosity)) if removed_snapshot_ids: @@ -981,7 +1049,7 @@ def _show_summary_tree_for( for s_id in sorted(removed_snapshot_ids): snapshot_table_info = context_diff.removed_snapshots[s_id] removed_tree.add( - f"[removed]{snapshot_table_info.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[removed]{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) tree.add(self._limit_model_names(removed_tree, self.verbosity)) if modified_snapshot_ids: @@ -991,7 +1059,9 @@ def _show_summary_tree_for( for s_id in modified_snapshot_ids: name = s_id.name display_name = context_diff.snapshots[s_id].display_name( - environment_naming_info, default_catalog, dialect=self.dialect + environment_naming_info, + default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) if context_diff.directly_modified(name): direct.add( @@ -1068,7 +1138,7 @@ def _prompt_categorize( if not no_diff: self.show_sql(plan.context_diff.text_diff(snapshot.name)) tree = Tree( - f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) indirect_tree = None @@ -1078,7 +1148,7 @@ def _prompt_categorize( indirect_tree = Tree("[indirect]Indirectly Modified Children:") tree.add(indirect_tree) indirect_tree.add( - f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) if indirect_tree: indirect_tree = self._limit_model_names(indirect_tree, self.verbosity) @@ -1096,7 +1166,7 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st if context_diff.directly_modified(snapshot.name): category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category] tree = Tree( - f"\n[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)} ({category_str})" + f"\n[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} ({category_str})" ) indirect_tree = None for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): @@ -1108,13 +1178,13 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st child_snapshot.change_category ] indirect_tree.add( - f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)} ({child_category_str})" + f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} ({child_category_str})" ) if indirect_tree: indirect_tree = self._limit_model_names(indirect_tree, self.verbosity) elif context_diff.metadata_updated(snapshot.name): tree = Tree( - f"\n[bold][metadata]Metadata Updated: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)}" + f"\n[bold][metadata]Metadata Updated: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) else: continue @@ -1141,7 +1211,9 @@ def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> N preview_modifier = " ([orange1]preview[/orange1])" display_name = snapshot.display_name( - plan.environment_naming_info, default_catalog, dialect=self.dialect + plan.environment_naming_info, + default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) backfill.add( f"{display_name}: \\[{_format_missing_intervals(snapshot, missing)}]{preview_modifier}" @@ -1286,6 +1358,7 @@ def log_warning(self, short_message: str, long_message: t.Optional[str] = None) logger.warning(long_message or short_message) if not self.ignore_warnings: if long_message: + file_path = None for handler in logger.root.handlers: if isinstance(handler, logging.FileHandler): file_path = handler.baseFilename @@ -1298,7 +1371,7 @@ def log_warning(self, short_message: str, long_message: t.Optional[str] = None) self._print(message_formatted) def log_success(self, message: str) -> None: - self._print(f"\n[green]{message}[/green]\n") + self._print(f"[green]{message}[/green]\n") def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: id = uuid.uuid4() @@ -1538,7 +1611,9 @@ def _snapshot_change_choices( use_rich_formatting: bool = True, ) -> t.Dict[SnapshotChangeCategory, str]: direct = snapshot.display_name( - environment_naming_info, default_catalog, dialect=self.dialect + environment_naming_info, + default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) if use_rich_formatting: direct = f"[direct]{direct}[/direct]" @@ -2007,7 +2082,7 @@ def show_model_difference_summary( else: for snapshot in added_models: self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) added_snapshot_audits = {s for s in added_snapshots if s.is_audit} @@ -2015,7 +2090,7 @@ def show_model_difference_summary( self._print("\n**Added Standalone Audits:**") for snapshot in sorted(added_snapshot_audits): self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) removed_snapshot_table_infos = set(context_diff.removed_snapshots.values()) @@ -2034,7 +2109,7 @@ def show_model_difference_summary( else: for snapshot_table_info in removed_models: self._print( - f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit} @@ -2042,7 +2117,7 @@ def show_model_difference_summary( self._print("\n**Removed Standalone Audits:**") for snapshot_table_info in sorted(removed_audit_snapshot_table_infos): self._print( - f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) modified_snapshots = { @@ -2063,7 +2138,7 @@ def show_model_difference_summary( self._print("\n**Directly Modified:**") for snapshot in sorted(directly_modified): self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) if not no_diff: self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```") @@ -2076,20 +2151,20 @@ def show_model_difference_summary( and modified_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD ): self._print( - f"- `{indirectly_modified[0].display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`\n" + f"- `{indirectly_modified[0].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`\n" f"- `.... {modified_length-2} more ....`\n" - f"- `{indirectly_modified[-1].display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{indirectly_modified[-1].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) else: for snapshot in indirectly_modified: self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) if metadata_modified: self._print("\n**Metadata Updated:**") for snapshot in sorted(metadata_modified): self._print( - f"- `{snapshot.display_name(environment_naming_info, default_catalog, dialect=self.dialect)}`" + f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`" ) def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: @@ -2109,7 +2184,9 @@ def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> N preview_modifier = " (**preview**)" display_name = snapshot.display_name( - plan.environment_naming_info, default_catalog, dialect=self.dialect + plan.environment_naming_info, + default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) snapshots.append( f"* `{display_name}`: [{_format_missing_intervals(snapshot, missing)}]{preview_modifier}" @@ -2133,7 +2210,7 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st if context_diff.directly_modified(snapshot.name): category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category] tree = Tree( - f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)} ({category_str})" + f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} ({category_str})" ) indirect_tree = None for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): @@ -2145,13 +2222,13 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st child_snapshot.change_category ] indirect_tree.add( - f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)} ({child_category_str})" + f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} ({child_category_str})" ) if indirect_tree: indirect_tree = self._limit_model_names(indirect_tree, self.verbosity) elif context_diff.metadata_updated(snapshot.name): tree = Tree( - f"[bold][metadata]Metadata Updated: {snapshot.display_name(plan.environment_naming_info, default_catalog, dialect=self.dialect)}" + f"[bold][metadata]Metadata Updated: {snapshot.display_name(plan.environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}" ) else: continue @@ -2243,27 +2320,37 @@ def _confirm(self, message: str, **kwargs: t.Any) -> bool: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: - self.evaluation_batches = batches + self.evaluation_model_batch_sizes = batch_sizes self.evaluation_environment_naming_info = environment_naming_info self.default_catalog = default_catalog def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: if not self.evaluation_batch_progress.get(snapshot.snapshot_id): display_name = snapshot.display_name( - self.evaluation_environment_naming_info, self.default_catalog, dialect=self.dialect + self.evaluation_environment_naming_info, + self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, + dialect=self.dialect, ) self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0) - print(f"Starting '{display_name}', Total batches: {self.evaluation_batches[snapshot]}") + print( + f"Starting '{display_name}', Total batches: {self.evaluation_model_batch_sizes[snapshot]}" + ) def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + interval: Interval, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id] - total_batches = self.evaluation_batches[snapshot] + total_batches = self.evaluation_model_batch_sizes[snapshot] loaded_batches += 1 self.evaluation_batch_progress[snapshot.snapshot_id] = (view_name, loaded_batches) @@ -2275,7 +2362,7 @@ def update_snapshot_evaluation_progress( total_finished_loading = len( [ s - for s, total in self.evaluation_batches.items() + for s, total in self.evaluation_model_batch_sizes.items() if self.evaluation_batch_progress.get(s.snapshot_id, (None, -1))[1] == total ] ) @@ -2401,19 +2488,27 @@ def stop_plan_evaluation(self) -> None: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: - self._write(f"Starting evaluation for {len(batches)} snapshots") + self._write(f"Starting evaluation for {sum(batch_sizes.values())} snapshots") def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: self._write(f"Evaluating {snapshot.name}") def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + interval: Interval, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: - self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms") + self._write( + f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}" + ) def stop_evaluation_progress(self, success: bool = True) -> None: self._write(f"Stopping evaluation with success={success}") @@ -2643,3 +2738,73 @@ def _format_audits_errors(error: NodeAuditsErrors) -> str: msg = msg.replace("\n", "\n ") error_messages.append(msg) return " " + "\n".join(error_messages) + + +def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) -> str: + if snapshot.is_model and ( + snapshot.model.kind.is_incremental + or snapshot.model.kind.is_managed + or snapshot.model.kind.is_custom + ): + # include time if interval < 1 day + if (interval[1] - interval[0]) < datetime.timedelta(days=1).total_seconds() * 1000: + return f"insert {to_ds(interval[0])} {to_datetime(interval[0]).strftime('%H:%M:%S')}-{to_datetime(interval[1]).strftime('%H:%M:%S')}" + return f"insert {to_ds(interval[0])} - {to_ds(interval[1])}" + return "" + + +def _justify_evaluation_model_info( + text: str, + length: int, + justify_direction: t.Literal["left", "right"] = "left", + dots_side: t.Literal["left", "right"] = "left", + prefix: str = "", + suffix: str = "", +) -> str: + """Format a model evaluation info string by justifying and truncating if needed. + + Args: + text: The string to format + length: The desired number of characters in the returned string + justify_direction: The justification direction ("left" or "l" or "right" or "r") + dots_side: The side of the dots if truncation is needed ("left" or "l" or "right" or "r") + prefix: A prefix to add to the returned string + suffix: A suffix to add to the returned string + + Returns: + The justified string, truncated with "..." if needed + """ + full_text = f"{prefix}{text}{suffix}" + if len(full_text) <= length: + return ( + full_text.ljust(length) + if justify_direction.startswith("l") + else full_text.rjust(length) + ) + + trunc_length = length - len(prefix) - len(suffix) + truncated_text = ( + "..." + text[-(trunc_length - 3) :] + if dots_side.startswith("l") + else text[: (trunc_length - 3)] + "..." + ) + return f"{prefix}{truncated_text}{suffix}" + + +def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str: + if snapshot.is_audit: + return "run standalone audit" + if snapshot.is_model and snapshot.model.kind.is_external: + return "run external audits" + if snapshot.model.kind.is_seed: + return "insert seed file" + if snapshot.model.kind.is_full: + return "full refresh" + if snapshot.model.kind.is_view: + return "recreate view" + if snapshot.model.kind.is_incremental_by_unique_key: + return "insert/update rows" + if snapshot.model.kind.is_incremental_by_partition: + return "insert partitions" + + return interval_info if interval_info else "" diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 6073a9f98d..0052cb785b 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -7,6 +7,7 @@ from sqlmesh.core.console import Console, get_console from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements from sqlmesh.core.macros import RuntimeStage +from sqlmesh.core.model.definition import AuditResult from sqlmesh.core.node import IntervalUnit from sqlmesh.core.notification_target import ( NotificationEvent, @@ -166,7 +167,7 @@ def evaluate( deployability_index: DeployabilityIndex, batch_index: int, **kwargs: t.Any, - ) -> None: + ) -> t.Tuple[t.List[AuditResult], t.List[AuditError]]: """Evaluate a snapshot and add the processed interval to the state sync. Args: @@ -178,6 +179,9 @@ def evaluate( batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it auto_restatement_enabled: Whether to enable auto restatements. kwargs: Additional kwargs to pass to the renderer. + + Returns: + Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn. """ validate_date_range(start, end) @@ -207,6 +211,7 @@ def evaluate( ) audit_errors_to_raise: t.List[AuditError] = [] + audit_errors_to_warn: t.List[AuditError] = [] for audit_result in (result for result in audit_results if result.count): error = AuditError( audit_name=audit_result.audit.name, @@ -224,15 +229,13 @@ def evaluate( if audit_result.blocking: audit_errors_to_raise.append(error) else: - get_console().log_warning( - f"\n{error}.", - long_message=f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}", - ) + audit_errors_to_warn.append(error) if audit_errors_to_raise: raise NodeAuditsErrors(audit_errors_to_raise) self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable) + return audit_results, audit_errors_to_warn def run( self, @@ -463,10 +466,12 @@ def evaluate_node(node: SchedulingUnit) -> None: execution_start_ts = now_timestamp() evaluation_duration_ms: t.Optional[int] = None + audit_results: t.List[AuditResult] = [] + audit_errors_to_warn: t.List[AuditError] = [] try: assert execution_time # mypy assert deployability_index # mypy - self.evaluate( + audit_results, audit_errors_to_warn = self.evaluate( snapshot=snapshot, start=start, end=end, @@ -474,10 +479,29 @@ def evaluate_node(node: SchedulingUnit) -> None: deployability_index=deployability_index, batch_index=batch_idx, ) + + for audit_error in audit_errors_to_warn: + display_name = snapshot.display_name( + environment_naming_info, + self.default_catalog, + self.snapshot_evaluator.adapter.dialect, + ) + self.console.log_warning( + f"\n{display_name}: {audit_error}.", + f"{audit_error}. Audit query:\n{audit_error.query.sql(audit_error.adapter_dialect)}", + ) + evaluation_duration_ms = now_timestamp() - execution_start_ts finally: + num_audits = len(audit_results) + num_audits_failed = sum(1 for result in audit_results if result.count) self.console.update_snapshot_evaluation_progress( - snapshot, batch_idx, evaluation_duration_ms + snapshot, + batched_intervals[snapshot][batch_idx], + batch_idx, + evaluation_duration_ms, + num_audits - num_audits_failed, + num_audits_failed, ) try: diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 920e165ba5..f208001904 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1559,6 +1559,15 @@ def display_name( Returns the model name as a qualified view name. This is just used for presenting information back to the user and `qualified_view_name` should be used when wanting a view name in all other cases. + + Args: + snapshot_info_like: The snapshot info object to get the display name for + environment_naming_info: Environment naming info to use for display name formatting + default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name. + dialect: Optional dialect type to use for name formatting + + Returns: + The formatted display name as a string """ if snapshot_info_like.is_audit: return snapshot_info_like.name diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 1a9ec2cab8..fdcdf4486b 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -128,22 +128,22 @@ def assert_new_env(result, new_env="prod", from_env="prod", initialize=True) -> ) in result.output -def assert_model_versions_created(result) -> None: - assert "Model versions created successfully" in result.output +def assert_physical_layer_updated(result) -> None: + assert "Physical layer updated" in result.output def assert_model_batches_executed(result) -> None: - assert "Model batches executed successfully" in result.output + assert "Model batches executed" in result.output -def assert_target_env_updated(result) -> None: - assert "Target environment updated successfully" in result.output +def assert_virtual_layer_updated(result) -> None: + assert "Virtual layer updated" in result.output def assert_backfill_success(result) -> None: - assert_model_versions_created(result) + assert_physical_layer_updated(result) assert_model_batches_executed(result) - assert_target_env_updated(result) + assert_virtual_layer_updated(result) def assert_plan_success(result, new_env="prod", from_env="prod") -> None: @@ -154,7 +154,7 @@ def assert_plan_success(result, new_env="prod", from_env="prod") -> None: def assert_virtual_update(result) -> None: - assert "Virtual Update executed successfully" in result.output + assert "Virtual Update executed" in result.output def test_version(runner, tmp_path): @@ -242,9 +242,9 @@ def test_plan_restate_model(runner, tmp_path): assert result.exit_code == 0 assert_duckdb_test(result) assert "No changes to plan: project files match the `prod` environment" in result.output - assert "sqlmesh_example.full_model evaluated in" in result.output + assert "sqlmesh_example.full_model [full refresh" in result.output assert_model_batches_executed(result) - assert_target_env_updated(result) + assert_virtual_layer_updated(result) @pytest.mark.parametrize("flag", ["--skip-backfill", "--dry-run"]) @@ -268,7 +268,7 @@ def test_plan_skip_backfill(runner, tmp_path, flag): ) assert result.exit_code == 0 assert_virtual_update(result) - assert "Model batches executed successfully" not in result.output + assert "Model batches executed" not in result.output def test_plan_auto_apply(runner, tmp_path): @@ -282,7 +282,7 @@ def test_plan_auto_apply(runner, tmp_path): # confirm verbose output not present assert "sqlmesh_example.seed_model created" not in result.output - assert "sqlmesh_example.seed_model promoted" not in result.output + assert "sqlmesh_example.seed_model updated" not in result.output def test_plan_verbose(runner, tmp_path): @@ -293,8 +293,8 @@ def test_plan_verbose(runner, tmp_path): cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "--verbose"], input="y\n" ) assert_plan_success(result) - assert "sqlmesh_example.seed_model created" in result.output - assert "sqlmesh_example.seed_model promoted" in result.output + assert "sqlmesh_example.seed_model created" in result.output + assert "sqlmesh_example.seed_model promoted" in result.output def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path): @@ -396,7 +396,7 @@ def test_plan_dev_create_from_virtual(runner, tmp_path): ) assert result.exit_code == 0 assert_new_env(result, "dev2", "dev", initialize=False) - assert_target_env_updated(result) + assert_virtual_layer_updated(result) assert_virtual_update(result) @@ -495,9 +495,9 @@ def test_plan_dev_no_prompts(runner, tmp_path): cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "dev", "--no-prompts"] ) assert "Apply - Backfill Tables [y/n]: " in result.output - assert "Model versions created successfully" not in result.output - assert "Model batches executed successfully" not in result.output - assert "The target environment has been updated successfully" not in result.output + assert "Physical layer updated" not in result.output + assert "Model batches executed" not in result.output + assert "The target environment has been updated" not in result.output def test_plan_dev_auto_apply(runner, tmp_path): @@ -533,7 +533,7 @@ def test_plan_dev_no_changes(runner, tmp_path): ) assert result.exit_code == 0 assert_new_env(result, "dev", initialize=False) - assert_target_env_updated(result) + assert_virtual_layer_updated(result) assert_virtual_update(result) @@ -552,8 +552,8 @@ def test_plan_nonbreaking(runner, tmp_path): assert "+ 'a' AS new_col" in result.output assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output - assert "sqlmesh_example.incremental_model evaluated in" in result.output - assert "sqlmesh_example.full_model evaluated in" not in result.output + assert "sqlmesh_example.incremental_model [insert" in result.output + assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output assert_backfill_success(result) @@ -610,8 +610,8 @@ def test_plan_breaking(runner, tmp_path): assert result.exit_code == 0 assert "+ item_id + 1 AS item_id," in result.output assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output - assert "sqlmesh_example.full_model evaluated in" in result.output - assert "sqlmesh_example.incremental_model evaluated in" not in result.output + assert "sqlmesh_example.full_model [full refresh" in result.output + assert "sqlmesh_example.incremental_model [insert" not in result.output assert_backfill_success(result) @@ -649,8 +649,8 @@ def test_plan_dev_select(runner, tmp_path): assert "+ item_id + 1 AS item_id," not in result.output assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output # only incremental_model backfilled - assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output - assert "sqlmesh_example__dev.full_model evaluated in" not in result.output + assert "sqlmesh_example__dev.incremental_model [insert" in result.output + assert "sqlmesh_example__dev.full_model [full refresh" not in result.output assert_backfill_success(result) @@ -688,8 +688,8 @@ def test_plan_dev_backfill(runner, tmp_path): "Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output ) # only incremental_model backfilled - assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output - assert "sqlmesh_example__dev.full_model evaluated in" not in result.output + assert "sqlmesh_example__dev.incremental_model [insert" in result.output + assert "sqlmesh_example__dev.full_model [full refresh" not in result.output assert_backfill_success(result) diff --git a/tests/core/test_context.py b/tests/core/test_context.py index b2c5090269..a2b7c56005 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -470,7 +470,9 @@ def test_override_builtin_audit_blocking_mode(): plan = context.plan(auto_apply=True, no_prompts=True) new_snapshot = next(iter(plan.context_diff.new_snapshots.values())) - assert mock_logger.call_args_list[0][0][0] == "\n'not_null' audit error: 1 row failed." + assert ( + mock_logger.call_args_list[0][0][0] == "\ndb.x: 'not_null' audit error: 1 row failed." + ) # Even though there are two builtin audits referenced in the above definition, we only # store the one that overrides `blocking` in the snapshot; the other one isn't needed @@ -1401,7 +1403,7 @@ def test_plan_runs_audits_on_dev_previews(sushi_context: Context, capsys, caplog log = caplog.text assert "'not_null' audit error:" in log assert "'at_least_one_non_blocking' audit error:" in log - assert "Target environment updated successfully" in stdout + assert "Virtual layer updated" in stdout def test_environment_statements(tmp_path: pathlib.Path): diff --git a/tests/integrations/jupyter/test_magics.py b/tests/integrations/jupyter/test_magics.py index 95ce610286..9519668ab3 100644 --- a/tests/integrations/jupyter/test_magics.py +++ b/tests/integrations/jupyter/test_magics.py @@ -291,7 +291,7 @@ def test_plan( # TODO: Should this be going to stdout? This is printing the status updates for when each batch finishes for # the models and how long it took - assert len(output.stdout.strip().split("\n")) == 24 + assert len(output.stdout.strip().split("\n")) == 46 assert not output.stderr assert len(output.outputs) == 4 text_output = convert_all_html_output_to_text(output) @@ -299,14 +299,14 @@ def test_plan( # This has minor differences between CI/CD and local. assert "[2K" in text_output[0] assert text_output[1].startswith( - "Virtually Updating 'prod' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%" + "Updating virtual layer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%" ) # TODO: Is this what we expect? assert text_output[2] == "" - assert text_output[3] == "Target environment updated successfully" + assert text_output[3] == "✔ Virtual layer updated" assert convert_all_html_output_to_tags(output) == [ ["pre", "span"], - ["pre"] + ["span"] * 4, + ["pre"] + ["span"] * 5, ["pre"], ["pre", "span"], ] @@ -326,7 +326,7 @@ def test_run_dag( assert not output.stderr assert len(output.outputs) == 2 assert convert_all_html_output_to_text(output) == [ - "Model batches executed successfully", + "✔ Model batches executed", "Run finished for environment 'prod'", ] assert get_all_html_output(output) == [ @@ -337,7 +337,7 @@ def test_run_dag( h( "span", {"style": SUCCESS_STYLE}, - "Model batches executed successfully", + "✔ Model batches executed", autoescape=False, ), autoescape=False, diff --git a/web/server/console.py b/web/server/console.py index 3112de3f14..9deb9e1137 100644 --- a/web/server/console.py +++ b/web/server/console.py @@ -7,7 +7,7 @@ from fastapi.encoders import jsonable_encoder from sse_starlette.sse import ServerSentEvent - +from sqlmesh.core.snapshot.definition import Interval from sqlmesh.core.console import TerminalConsole from sqlmesh.core.environment import EnvironmentNamingInfo from sqlmesh.core.plan.definition import EvaluatablePlan @@ -91,7 +91,7 @@ def stop_restate_progress(self, success: bool) -> None: def start_evaluation_progress( self, - batches: t.Dict[Snapshot, int], + batch_sizes: t.Dict[Snapshot, int], environment_naming_info: EnvironmentNamingInfo, default_catalog: t.Optional[str], ) -> None: @@ -104,7 +104,7 @@ def start_evaluation_progress( name=snapshot.name, view_name=snapshot.display_name(environment_naming_info, default_catalog), ) - for snapshot, total_tasks in batches.items() + for snapshot, total_tasks in batch_sizes.items() } self.plan_apply_stage_tracker.add_stage( models.PlanStage.backfill, @@ -123,7 +123,13 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: self.log_event_plan_apply() def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] + self, + snapshot: Snapshot, + interval: Interval, + batch_idx: int, + duration_ms: t.Optional[int], + num_audits_passed: int, + num_audits_failed: int, ) -> None: if self.plan_apply_stage_tracker and self.plan_apply_stage_tracker.backfill: task = self.plan_apply_stage_tracker.backfill.tasks[snapshot.name]