Skip to content

Commit 669d4fc

Browse files
committed
Add plan summary of executed model kinds
1 parent d7b6afa commit 669d4fc

5 files changed

Lines changed: 137 additions & 131 deletions

File tree

sqlmesh/core/console.py

Lines changed: 92 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535
SnapshotId,
3636
SnapshotInfoLike,
3737
)
38-
from sqlmesh.core.snapshot.definition import Interval, Intervals
38+
from sqlmesh.core.snapshot.definition import Interval
3939
from sqlmesh.core.test import ModelTest
4040
from sqlmesh.utils import rich as srich
4141
from sqlmesh.utils import Verbosity
4242
from sqlmesh.utils.concurrency import NodeExecutionFailedError
43-
from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds, to_ds, to_tstz
43+
from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds, to_ds, to_datetime
4444
from sqlmesh.utils.errors import (
4545
PythonModelEvalError,
4646
NodeAuditsErrors,
@@ -74,7 +74,8 @@
7474

7575
PROGRESS_BAR_WIDTH = 40
7676
LINE_WRAP_WIDTH = 100
77-
GREEN_CHECK_MARK = "[green]\u2714[/green]"
77+
CHECK_MARK = "\u2714"
78+
GREEN_CHECK_MARK = f"[green]{CHECK_MARK}[/green]"
7879
RED_X_MARK = "\u274c"
7980

8081

@@ -533,8 +534,8 @@ class TerminalConsole(Console):
533534

534535
TABLE_DIFF_SOURCE_BLUE = "#0248ff"
535536

536-
EVAL_PROGRESS_BAR_COLUMN_WIDTHS: t.Dict[str, int] = {
537-
"batch": 9,
537+
PROGRESS_BAR_COLUMN_WIDTHS: t.Dict[str, int] = {
538+
"batch": 7,
538539
"name": 50,
539540
"annotation": 50,
540541
"duration": 8,
@@ -602,7 +603,7 @@ def start_evaluation_progress(
602603
"""Indicates that a new snapshot evaluation progress has begun."""
603604
if not self.evaluation_progress_live:
604605
self.evaluation_total_progress = make_progress_bar(
605-
"Evaluating model batches", self.console
606+
"Executing model batches", self.console
606607
)
607608

608609
self.evaluation_model_progress = Progress(
@@ -656,31 +657,42 @@ def update_snapshot_evaluation_progress(
656657
):
657658
total_batches = self.evaluation_model_batch_sizes[snapshot]
658659
batch_num = str(batch_idx + 1).rjust(len(str(total_batches)))
659-
batch = f"[{batch_num}/{total_batches}]".ljust(
660-
self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["batch"]
661-
)
660+
batch = f"[{batch_num}/{total_batches}]".ljust(self.PROGRESS_BAR_COLUMN_WIDTHS["batch"])
662661

663662
if duration_ms:
664-
display_name = snapshot.display_name(
665-
self.environment_naming_info,
666-
self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
667-
dialect=self.dialect,
668-
).ljust(self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["name"])
663+
display_name = _justify_evaluation_model_info(
664+
snapshot.display_name(
665+
self.environment_naming_info,
666+
self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
667+
dialect=self.dialect,
668+
),
669+
self.PROGRESS_BAR_COLUMN_WIDTHS["name"],
670+
)
669671

670672
annotation = _create_evaluation_model_annotation(
671673
snapshot, _format_evaluation_model_interval(snapshot, interval)
672674
)
673-
675+
audits_str = ""
674676
if num_audits_passed:
675-
annotation += f", {num_audits_passed} audits pass"
677+
audits_str += f" {CHECK_MARK}{num_audits_passed}"
676678
if num_audits_failed:
677-
annotation += f", {num_audits_failed} audits fail {RED_X_MARK}"
678-
annotation = (annotation + "]").ljust(
679-
self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["annotation"]
679+
audits_str += f" {RED_X_MARK}{num_audits_failed}"
680+
audits_str = f", audits{audits_str}" if audits_str else ""
681+
682+
annotation_width = self.PROGRESS_BAR_COLUMN_WIDTHS["annotation"]
683+
annotation = _justify_evaluation_model_info(
684+
annotation + audits_str,
685+
annotation_width
686+
if not num_audits_failed
687+
else annotation_width - 1, # -1 for RED_X_MARK's extra space
688+
dots_side="right",
689+
prefix=" \[",
690+
suffix="]",
680691
)
692+
annotation = annotation.replace(CHECK_MARK, GREEN_CHECK_MARK)
681693

682-
duration = f"{(duration_ms / 1000.0):.2f}s".rjust(
683-
self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["duration"]
694+
duration = f"{(duration_ms / 1000.0):.2f}s".ljust(
695+
self.PROGRESS_BAR_COLUMN_WIDTHS["duration"]
684696
)
685697

686698
self.evaluation_progress_live.console.print(
@@ -701,7 +713,7 @@ def stop_evaluation_progress(self, success: bool = True) -> None:
701713
if self.evaluation_progress_live:
702714
self.evaluation_progress_live.stop()
703715
if success:
704-
self.log_success("Model batches evaluated successfully")
716+
self.log_success(f"{GREEN_CHECK_MARK} Model batches executed successfully")
705717

706718
self.evaluation_progress_live = None
707719
self.evaluation_total_progress = None
@@ -720,13 +732,12 @@ def start_creation_progress(
720732
) -> None:
721733
"""Indicates that a new creation progress has begun."""
722734
if self.creation_progress is None:
723-
message = "Creating physical table" if total_tasks == 1 else "Creating physical tables"
724-
self.creation_progress = make_progress_bar(message, self.console)
735+
self.creation_progress = make_progress_bar("Updating physical layer", self.console)
725736

726737
self._print("")
727738
self.creation_progress.start()
728739
self.creation_task = self.creation_progress.add_task(
729-
"Creating physical tables...",
740+
"Updating physical layer...",
730741
total=total_tasks,
731742
)
732743

@@ -738,7 +749,7 @@ def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
738749
if self.creation_progress is not None and self.creation_task is not None:
739750
if self.verbosity >= Verbosity.VERBOSE:
740751
self.creation_progress.live.console.print(
741-
f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} [green]created[/green]"
752+
f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} [green]created[/green]"
742753
)
743754
self.creation_progress.update(self.creation_task, refresh=True, advance=1)
744755

@@ -749,7 +760,7 @@ def stop_creation_progress(self, success: bool = True) -> None:
749760
self.creation_progress.stop()
750761
self.creation_progress = None
751762
if success:
752-
self.log_success("\nModel versions created successfully")
763+
self.log_success(f"\n{GREEN_CHECK_MARK} Physical layer updated successfully")
753764

754765
self.environment_naming_info = EnvironmentNamingInfo()
755766
self.default_catalog = None
@@ -790,7 +801,7 @@ def start_promotion_progress(
790801
if self.promotion_progress is None:
791802
self.promotion_progress = Progress(
792803
TextColumn(
793-
f"[bold blue]Virtually updating '{environment_naming_info.name}' environment views",
804+
"[bold blue]Updating virtual layer",
794805
justify="right",
795806
),
796807
BarColumn(bar_width=PROGRESS_BAR_WIDTH),
@@ -816,7 +827,7 @@ def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool)
816827
check_mark = f"{GREEN_CHECK_MARK} " if promoted else " "
817828
action_str = "[green]updated[/green]" if promoted else "[yellow]demoted[/yellow]"
818829
self.promotion_progress.live.console.print(
819-
f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)} {action_str}"
830+
f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}"
820831
)
821832
self.promotion_progress.update(self.promotion_task, refresh=True, advance=1)
822833

@@ -827,7 +838,7 @@ def stop_promotion_progress(self, success: bool = True) -> None:
827838
self.promotion_progress.stop()
828839
self.promotion_progress = None
829840
if success:
830-
self.log_success("\nEnvironment views updated successfully")
841+
self.log_success(f"\n{GREEN_CHECK_MARK} Virtual layer updated successfully")
831842

832843
self.environment_naming_info = EnvironmentNamingInfo()
833844
self.default_catalog = None
@@ -2617,7 +2628,8 @@ def show_row_diff(
26172628
self._write(row_diff)
26182629

26192630

2620-
_CONSOLE: Console = NoopConsole()
2631+
# _CONSOLE: Console = NoopConsole()
2632+
_CONSOLE: Console = TerminalConsole()
26212633

26222634

26232635
def set_console(console: Console) -> None:
@@ -2733,111 +2745,70 @@ def _format_audits_errors(error: NodeAuditsErrors) -> str:
27332745

27342746

27352747
def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) -> str:
2736-
if snapshot.is_model:
2737-
# only include time if interval < 1 day
2738-
fmt_func = (
2739-
to_ds
2740-
if (interval[1] - interval[0]) >= datetime.timedelta(days=1).total_seconds()
2741-
else to_tstz
2742-
)
2743-
return (
2744-
f"insert {fmt_func(interval[0])} - {fmt_func(interval[1])}"
2745-
if snapshot.model.kind.is_incremental
2746-
or snapshot.model.kind.is_managed
2747-
or snapshot.model.kind.is_custom
2748-
else ""
2749-
)
2748+
if snapshot.is_model and (
2749+
snapshot.model.kind.is_incremental
2750+
or snapshot.model.kind.is_managed
2751+
or snapshot.model.kind.is_custom
2752+
):
2753+
# include time if interval < 1 day
2754+
if (interval[1] - interval[0]) < datetime.timedelta(days=1).total_seconds() * 1000:
2755+
return f"insert {to_ds(interval[0])} {to_datetime(interval[0]).strftime('%H:%M:%S')}-{to_datetime(interval[1]).strftime('%H:%M:%S')}"
2756+
return f"insert {to_ds(interval[0])} - {to_ds(interval[1])}"
27502757
return ""
27512758

27522759

2753-
def _create_evaluation_model_info(
2754-
batched_intervals: t.Dict[Snapshot, Intervals],
2755-
model_batch_sizes: t.Dict[Snapshot, int],
2756-
environment_naming_info: EnvironmentNamingInfo,
2757-
default_catalog: t.Optional[str],
2758-
dialect: t.Optional[DialectType],
2759-
) -> t.Tuple[t.Dict[Snapshot, t.Dict[str, t.Any]], t.Dict[str, int]]:
2760-
"""Creates model information dictionaries for model evaluation progress bar.
2760+
def _justify_evaluation_model_info(
2761+
text: str,
2762+
length: int,
2763+
justify_direction: str = "left",
2764+
dots_side: str = "left",
2765+
prefix: str = "",
2766+
suffix: str = "",
2767+
) -> str:
2768+
"""Format a model evaluation info string by justifying and truncating if needed.
27612769
2762-
Parameters:
2763-
batched_intervals: Dictionary mapping snapshot batches to their evaluation intervals
2764-
model_batch_sizes: Dictionary mapping snapshots to their batch sizes
2765-
environment_naming_info: Information about environment naming, needed to render model name
2766-
default_catalog: Optional default catalog name for rendering model name
2767-
dialect: Optional SQL dialect for rendering model name
2770+
Args:
2771+
text: The string to format
2772+
length: The desired number of characters in the returned string
2773+
justify_direction: The justification direction ("left" or "l" or "right" or "r")
2774+
dots_side: The side of the dots if truncation is needed ("left" or "l" or "right" or "r")
2775+
prefix: A prefix to add to the returned string
2776+
suffix: A suffix to add to the returned string
27682777
27692778
Returns:
2770-
A tuple containing:
2771-
- Dictionary mapping snapshots to their model's display information
2772-
- Dictionary of output field names to column widths
2779+
The justified string, truncated with "..." if needed
27732780
"""
2774-
model_info: t.Dict[Snapshot, t.Dict[str, t.Any]] = {}
2775-
model_column_widths = {}
2776-
model_column_widths["display_name"] = 0
2777-
model_column_widths["annotation"] = 0
2778-
2779-
for snapshot in batched_intervals:
2780-
model_info[snapshot] = {}
2781-
model_info[snapshot]["display_name"] = snapshot.display_name(
2782-
environment_naming_info, default_catalog, dialect=dialect
2783-
)
2784-
model_column_widths["display_name"] = max(
2785-
model_column_widths["display_name"], len(model_info[snapshot]["display_name"])
2786-
)
2787-
2788-
# The annotation includes audit results. We cannot build the audits result string
2789-
# until after evaluation occurs, but we must determine the annotation column width here.
2790-
# Therefore, we add enough padding for the longest possible audits result string.
2791-
audit_pad = 0
2792-
if snapshot.is_model and snapshot.model.audits:
2793-
num_audits = len(snapshot.model.audits_with_args)
2794-
num_nonblocking_audits = sum(
2795-
1
2796-
for audit in snapshot.model.audits_with_args
2797-
if not audit[0].blocking
2798-
or ("blocking" in audit[1] and audit[1]["blocking"] == exp.false())
2799-
)
2800-
# make enough room for all audits to pass
2801-
audit_pad = len(f", {str(num_audits)} audits passed")
2802-
if num_nonblocking_audits:
2803-
# and add enough room for all nonblocking audits to fail
2804-
audit_pad += len(f", {str(num_nonblocking_audits)} audits failed X") # red X
2805-
audit_pad += 1 # closing bracket
2806-
2807-
model_info[snapshot]["annotation"] = [
2808-
_create_evaluation_model_annotation(
2809-
snapshot,
2810-
_format_evaluation_model_interval(snapshot, interval),
2811-
)
2812-
for interval in batched_intervals[snapshot]
2813-
]
2814-
model_column_widths["annotation"] = max(
2815-
model_column_widths["annotation"],
2816-
max(len(annotation) for annotation in model_info[snapshot]["annotation"]) + audit_pad,
2781+
full_text = f"{prefix}{text}{suffix}"
2782+
if len(full_text) <= length:
2783+
return (
2784+
full_text.ljust(length)
2785+
if justify_direction.startswith("l")
2786+
else full_text.rjust(length)
28172787
)
28182788

2819-
model_column_widths["batch"] = 5 # number characters in default "[1/1]"
2820-
# do we need space for more than one digit?
2821-
if any(size > 9 for size in model_batch_sizes.values()):
2822-
model_column_widths["batch"] = (
2823-
max(len(str(size)) for size in model_batch_sizes.values()) * 2
2824-
) + 3 # brackets and slash
2825-
2826-
return model_info, model_column_widths
2789+
trunc_length = length - len(prefix) - len(suffix)
2790+
truncated_text = (
2791+
"..." + text[-(trunc_length - 3) :]
2792+
if dots_side.startswith("l")
2793+
else text[: (trunc_length - 3)] + "..."
2794+
)
2795+
return f"{prefix}{truncated_text}{suffix}"
28272796

28282797

28292798
def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str:
2830-
if snapshot.is_audit or (snapshot.is_model and snapshot.model.kind.is_external):
2831-
return " \[run audits"
2799+
if snapshot.is_audit:
2800+
return "run standalone audit"
2801+
if snapshot.is_model and snapshot.model.kind.is_external:
2802+
return "run external model audits"
28322803
if snapshot.model.kind.is_seed:
2833-
return " \[insert from seed file"
2804+
return "insert from seed file"
28342805
if snapshot.model.kind.is_full:
2835-
return " \[full refresh"
2806+
return "full refresh"
28362807
if snapshot.model.kind.is_view:
2837-
return " \[recreate view"
2808+
return "recreate view"
28382809
if snapshot.model.kind.is_incremental_by_unique_key:
2839-
return " \[insert or update rows"
2810+
return "insert or update rows"
28402811
if snapshot.model.kind.is_incremental_by_partition:
2841-
return " \[insert partition"
2812+
return "insert partition"
28422813

2843-
return f" \[{interval_info}" if interval_info else ""
2814+
return interval_info if interval_info else ""

sqlmesh/core/plan/evaluator.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,41 @@ def evaluate(
160160
if not plan.requires_backfill:
161161
self.console.log_success("Virtual Update executed successfully")
162162

163+
model_kind_counts: t.Dict[str, int] = {}
164+
audit_counts: t.Dict[str, int] = {}
165+
for snapshot in snapshots.values():
166+
if snapshot.name in all_names:
167+
if snapshot.is_audit:
168+
audit_counts["standalone"] = audit_counts.get("standalone", 0) + 1
169+
if (
170+
snapshot.is_model
171+
and snapshot.model_kind_name
172+
and snapshot.model.kind
173+
and not snapshot.model.kind.is_external
174+
and not snapshot.model.kind.is_embedded
175+
):
176+
kind_name = snapshot.model_kind_name
177+
model_kind_counts[kind_name] = model_kind_counts.get(kind_name, 0) + 1
178+
if snapshot.is_model and snapshot.model.audits:
179+
if snapshot.model.kind.is_external:
180+
audit_counts["EXTERNAL model"] = audit_counts.get(
181+
"EXTERNAL model", 0
182+
) + len(snapshot.model.audits)
183+
else:
184+
audit_counts["model"] = audit_counts.get("model", 0) + len(
185+
snapshot.model.audits
186+
)
187+
188+
summary_str = ", ".join(
189+
[f"{v} {k} model{'s' if v > 1 else ''}" for k, v in model_kind_counts.items()]
190+
)
191+
for audit_type in ["EXTERNAL model", "model", "standalone"]:
192+
if audit_type in audit_counts:
193+
count = audit_counts[audit_type]
194+
summary_str += f", {count} {audit_type} audit{'s' if count > 1 else ''}"
195+
if summary_str:
196+
self.console.log_status_update(f"Plan applied for {summary_str}")
197+
163198
execute_environment_statements(
164199
adapter=self.snapshot_evaluator.adapter,
165200
environment_statements=plan.environment_statements or [],

0 commit comments

Comments
 (0)