Skip to content

Commit aa678f6

Browse files
committed
Only include snapshots with virtual layer views in promotion progress bar
1 parent 254f7b8 commit aa678f6

5 files changed

Lines changed: 132 additions & 63 deletions

File tree

sqlmesh/core/console.py

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,12 @@ def start_promotion_progress(
172172
"""Indicates that a new snapshot promotion progress has begun."""
173173

174174
@abc.abstractmethod
175-
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
175+
def update_promotion_progress(
176+
self,
177+
snapshot: SnapshotInfoLike,
178+
promoted: bool,
179+
snapshots_with_virtual_views: t.List[SnapshotId],
180+
) -> None:
176181
"""Update the snapshot promotion progress."""
177182

178183
@abc.abstractmethod
@@ -406,7 +411,12 @@ def start_promotion_progress(
406411
) -> None:
407412
pass
408413

409-
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
414+
def update_promotion_progress(
415+
self,
416+
snapshot: SnapshotInfoLike,
417+
promoted: bool,
418+
snapshots_with_virtual_views: t.List[SnapshotId],
419+
) -> None:
410420
pass
411421

412422
def stop_promotion_progress(self, success: bool = True) -> None:
@@ -799,17 +809,7 @@ def start_promotion_progress(
799809
) -> None:
800810
"""Indicates that a new snapshot promotion progress has begun."""
801811
if self.promotion_progress is None:
802-
self.promotion_progress = Progress(
803-
TextColumn(
804-
"[bold blue]Updating virtual layer ", # space to align with other progress bars
805-
justify="right",
806-
),
807-
BarColumn(bar_width=PROGRESS_BAR_WIDTH),
808-
"[progress.percentage]{task.percentage:>3.1f}%",
809-
"•",
810-
TimeElapsedColumn(),
811-
console=self.console,
812-
)
812+
self.promotion_progress = make_progress_bar("Updating virtual layer ", self.console)
813813

814814
self.promotion_progress.start()
815815
self.promotion_task = self.promotion_progress.add_task(
@@ -820,14 +820,29 @@ def start_promotion_progress(
820820
self.environment_naming_info = environment_naming_info
821821
self.default_catalog = default_catalog
822822

823-
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
823+
def update_promotion_progress(
824+
self,
825+
snapshot: SnapshotInfoLike,
826+
promoted: bool,
827+
snapshots_with_virtual_views: t.List[SnapshotId],
828+
) -> None:
824829
"""Update the snapshot promotion progress."""
825-
if self.promotion_progress is not None and self.promotion_task is not None:
830+
if (
831+
self.promotion_progress is not None
832+
and self.promotion_task is not None
833+
and snapshot.snapshot_id in snapshots_with_virtual_views
834+
):
826835
if self.verbosity >= Verbosity.VERBOSE:
827-
check_mark = f"{GREEN_CHECK_MARK} " if promoted else " "
828-
action_str = "[green]updated[/green]" if promoted else "[yellow]demoted[/yellow]"
836+
action_str = ""
837+
if promoted:
838+
action_str = (
839+
"[yellow]updated[/yellow]"
840+
if snapshot.previous_version
841+
else "[green]created[/green]"
842+
)
843+
action_str = action_str or "[red]dropped[/red]"
829844
self.promotion_progress.live.console.print(
830-
f"{check_mark}{snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}"
845+
f"{GREEN_CHECK_MARK} {snapshot.display_name(self.environment_naming_info, self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect).ljust(self.PROGRESS_BAR_COLUMN_WIDTHS['name'])} {action_str}"
831846
)
832847
self.promotion_progress.update(self.promotion_task, refresh=True, advance=1)
833848

@@ -2410,7 +2425,12 @@ def start_promotion_progress(
24102425
self.promotion_status = (0, total_tasks)
24112426
print(f"Virtually Updating '{environment_naming_info.name}'")
24122427

2413-
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
2428+
def update_promotion_progress(
2429+
self,
2430+
snapshot: SnapshotInfoLike,
2431+
promoted: bool,
2432+
snapshots_with_virtual_views: t.List[SnapshotId],
2433+
) -> None:
24142434
"""Update the snapshot promotion progress."""
24152435
num_promotions, total_promotions = self.promotion_status
24162436
num_promotions += 1
@@ -2541,7 +2561,12 @@ def start_promotion_progress(
25412561
) -> None:
25422562
self._write(f"Starting promotion for {total_tasks} snapshots")
25432563

2544-
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
2564+
def update_promotion_progress(
2565+
self,
2566+
snapshot: SnapshotInfoLike,
2567+
promoted: bool,
2568+
snapshots_with_virtual_views: t.List[SnapshotId],
2569+
) -> None:
25452570
self._write(f"Promoting {snapshot.name}")
25462571

25472572
def stop_promotion_progress(self, success: bool = True) -> None:
@@ -2628,8 +2653,7 @@ def show_row_diff(
26282653
self._write(row_diff)
26292654

26302655

2631-
# _CONSOLE: Console = NoopConsole()
2632-
_CONSOLE: Console = TerminalConsole()
2656+
_CONSOLE: Console = NoopConsole()
26332657

26342658

26352659
def set_console(console: Console) -> None:
@@ -2683,6 +2707,8 @@ def _format_missing_intervals(snapshot: Snapshot, missing: SnapshotIntervals) ->
26832707
if snapshot.is_incremental
26842708
else "recreate view"
26852709
if snapshot.is_view
2710+
else "run audits"
2711+
if snapshot.is_external
26862712
else "full refresh"
26872713
)
26882714

sqlmesh/core/plan/evaluator.py

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -160,40 +160,26 @@ def evaluate(
160160
if not plan.requires_backfill:
161161
self.console.log_success("Virtual Update executed successfully")
162162

163-
model_kind_counts: t.Dict[str, int] = {}
164-
audit_counts: t.Dict[str, int] = {}
165-
for snapshot in snapshots.values():
166-
if snapshot.name in all_names:
167-
if snapshot.is_audit:
168-
audit_counts["standalone"] = audit_counts.get("standalone", 0) + 1
169-
if (
170-
snapshot.is_model
171-
and snapshot.model_kind_name
172-
and snapshot.model.kind
173-
and not snapshot.model.kind.is_external
174-
and not snapshot.model.kind.is_embedded
175-
):
176-
kind_name = snapshot.model_kind_name
177-
model_kind_counts[kind_name] = model_kind_counts.get(kind_name, 0) + 1
178-
if snapshot.is_model and snapshot.model.audits:
179-
if snapshot.model.kind.is_external:
180-
audit_counts["EXTERNAL model"] = audit_counts.get(
181-
"EXTERNAL model", 0
182-
) + len(snapshot.model.audits)
183-
else:
184-
audit_counts["model"] = audit_counts.get("model", 0) + len(
185-
snapshot.model.audits
186-
)
187-
188-
summary_str = ", ".join(
189-
[f"{v} {k} model{'s' if v > 1 else ''}" for k, v in model_kind_counts.items()]
163+
snapshots_to_count: t.List[SnapshotId] = [
164+
*plan.directly_modified_snapshots,
165+
*plan.removed_snapshots,
166+
] + [
167+
snapshot_id
168+
for snapshot_ids in plan.indirectly_modified_snapshots.values()
169+
for snapshot_id in snapshot_ids
170+
]
171+
172+
removed_snapshots = [
173+
s
174+
for s in (plan.environment.previous_finalized_snapshots or [])
175+
if s.snapshot_id in snapshots_to_count
176+
]
177+
summary_msg = _create_plan_summary_msg(
178+
[*snapshots.values(), *removed_snapshots], snapshots_to_count
190179
)
191-
for audit_type in ["EXTERNAL model", "model", "standalone"]:
192-
if audit_type in audit_counts:
193-
count = audit_counts[audit_type]
194-
summary_str += f", {count} {audit_type} audit{'s' if count > 1 else ''}"
195-
if summary_str:
196-
self.console.log_status_update(f"Plan applied for {summary_str}")
180+
181+
if summary_msg:
182+
self.console.log_status_update(f"Plan applied for {summary_msg}")
197183

198184
execute_environment_statements(
199185
adapter=self.snapshot_evaluator.adapter,
@@ -396,8 +382,14 @@ def _update_views(
396382

397383
environment = plan.environment
398384

385+
# progress bar should only show snapshots that have a virtual layer view
386+
snapshots_with_virtual_views = [
387+
s.snapshot_id
388+
for s in [*promotion_result.added, *promotion_result.removed]
389+
if s.is_model and s.model_kind_name and not s.model_kind_name.is_symbolic
390+
]
399391
self.console.start_promotion_progress(
400-
len(promotion_result.added) + len(promotion_result.removed),
392+
len(snapshots_with_virtual_views),
401393
environment.naming_info,
402394
self.default_catalog,
403395
)
@@ -409,15 +401,19 @@ def _update_views(
409401
[snapshots[s.snapshot_id] for s in promotion_result.added],
410402
environment.naming_info,
411403
deployability_index=deployability_index,
412-
on_complete=lambda s: self.console.update_promotion_progress(s, True),
404+
on_complete=lambda s: self.console.update_promotion_progress(
405+
s, True, snapshots_with_virtual_views
406+
),
413407
snapshots=snapshots,
414408
)
415409
if promotion_result.removed_environment_naming_info:
416410
self._demote_snapshots(
417411
plan,
418412
promotion_result.removed,
419413
promotion_result.removed_environment_naming_info,
420-
on_complete=lambda s: self.console.update_promotion_progress(s, False),
414+
on_complete=lambda s: self.console.update_promotion_progress(
415+
s, False, snapshots_with_virtual_views
416+
),
421417
)
422418

423419
self.state_sync.finalize(environment)
@@ -772,3 +768,46 @@ def update_intervals_for_new_snapshots(
772768

773769
if snapshots_intervals:
774770
state_sync.add_snapshots_intervals(snapshots_intervals)
771+
772+
773+
def _create_plan_summary_msg(
774+
snapshots: t.Iterable[SnapshotInfoLike], snapshots_to_count: t.List[SnapshotId]
775+
) -> str:
776+
model_kind_counts: t.Dict[str, int] = {}
777+
audit_counts: t.Dict[str, int] = {}
778+
for snapshot in snapshots:
779+
if snapshot.snapshot_id in snapshots_to_count:
780+
if snapshot.is_audit:
781+
audit_counts["standalone"] = audit_counts.get("standalone", 0) + 1
782+
if (
783+
snapshot.is_model
784+
and snapshot.model_kind_name
785+
and not snapshot.model_kind_name.is_symbolic
786+
):
787+
kind_name = snapshot.model_kind_name
788+
model_kind_counts[kind_name] = model_kind_counts.get(kind_name, 0) + 1
789+
if (
790+
snapshot.is_model
791+
# removed snapshot SnapshotTableInfos don't have `model`
792+
and hasattr(snapshot, "model")
793+
and snapshot.model
794+
and snapshot.model.audits
795+
):
796+
if snapshot.model.kind.is_external: # type: ignore
797+
audit_counts["EXTERNAL model"] = audit_counts.get("EXTERNAL model", 0) + len(
798+
snapshot.model.audits # type: ignore
799+
)
800+
else:
801+
audit_counts["model"] = audit_counts.get("model", 0) + len(
802+
snapshot.model.audits # type: ignore
803+
)
804+
805+
summary_str = ", ".join(
806+
[f"{v} {k} model{'s' if v > 1 else ''}" for k, v in model_kind_counts.items()]
807+
)
808+
for audit_type in ["EXTERNAL model", "model", "standalone"]:
809+
if audit_type in audit_counts:
810+
count = audit_counts[audit_type]
811+
summary_str += f", {count} {audit_type} audit{'s' if count > 1 else ''}"
812+
813+
return summary_str

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -944,8 +944,8 @@ def _promote_snapshot(
944944
)
945945
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
946946

947-
if on_complete is not None:
948-
on_complete(snapshot)
947+
if on_complete is not None:
948+
on_complete(snapshot)
949949

950950
def _demote_snapshot(
951951
self,

tests/cli/test_cli.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,6 @@ def test_plan_verbose(runner, tmp_path):
294294
)
295295
assert_plan_success(result)
296296
assert "sqlmesh_example.seed_model created" in result.output
297-
assert "sqlmesh_example.seed_model updated" in result.output
298297

299298

300299
def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path):

web/server/console.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from sqlmesh.core.console import TerminalConsole
1212
from sqlmesh.core.environment import EnvironmentNamingInfo
1313
from sqlmesh.core.plan.definition import EvaluatablePlan
14-
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike
14+
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike, SnapshotId
1515
from sqlmesh.core.test import ModelTest
1616
from sqlmesh.utils.date import now_timestamp
1717
from web.server import models
@@ -170,7 +170,12 @@ def start_promotion_progress(
170170

171171
self.log_event_plan_apply()
172172

173-
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
173+
def update_promotion_progress(
174+
self,
175+
snapshot: SnapshotInfoLike,
176+
promoted: bool,
177+
snapshots_with_virtual_views: t.List[SnapshotId],
178+
) -> None:
174179
if self.plan_apply_stage_tracker and self.plan_apply_stage_tracker.promote:
175180
self.plan_apply_stage_tracker.promote.update(
176181
{"num_tasks": self.plan_apply_stage_tracker.promote.num_tasks + 1}

0 commit comments

Comments
 (0)