Skip to content

Commit 4c64e9e

Browse files
committed
Filter snapshots to promote in environment
1 parent 8dd37dd commit 4c64e9e

4 files changed

Lines changed: 20 additions & 58 deletions

File tree

sqlmesh/core/console.py

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,7 @@ def start_promotion_progress(
175175
"""Indicates that a new snapshot promotion progress has begun."""
176176

177177
@abc.abstractmethod
178-
def update_promotion_progress(
179-
self,
180-
snapshot: SnapshotInfoLike,
181-
promoted: bool,
182-
) -> None:
178+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
183179
"""Update the snapshot promotion progress."""
184180

185181
@abc.abstractmethod
@@ -473,11 +469,7 @@ def start_promotion_progress(
473469
) -> None:
474470
pass
475471

476-
def update_promotion_progress(
477-
self,
478-
snapshot: SnapshotInfoLike,
479-
promoted: bool,
480-
) -> None:
472+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
481473
pass
482474

483475
def stop_promotion_progress(self, success: bool = True) -> None:
@@ -953,11 +945,7 @@ def start_promotion_progress(
953945
self.environment_naming_info = environment_naming_info
954946
self.default_catalog = default_catalog
955947

956-
def update_promotion_progress(
957-
self,
958-
snapshot: SnapshotInfoLike,
959-
promoted: bool,
960-
) -> None:
948+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
961949
"""Update the snapshot promotion progress."""
962950
if self.promotion_progress is not None and self.promotion_task is not None:
963951
if self.verbosity >= Verbosity.VERBOSE:
@@ -2798,11 +2786,7 @@ def start_promotion_progress(
27982786
self.promotion_status = (0, total_tasks)
27992787
print(f"Virtually Updating '{environment_naming_info.name}'")
28002788

2801-
def update_promotion_progress(
2802-
self,
2803-
snapshot: SnapshotInfoLike,
2804-
promoted: bool,
2805-
) -> None:
2789+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
28062790
"""Update the snapshot promotion progress."""
28072791
num_promotions, total_promotions = self.promotion_status
28082792
num_promotions += 1
@@ -2933,11 +2917,7 @@ def start_promotion_progress(
29332917
) -> None:
29342918
self._write(f"Starting promotion for {total_tasks} snapshots")
29352919

2936-
def update_promotion_progress(
2937-
self,
2938-
snapshot: SnapshotInfoLike,
2939-
promoted: bool,
2940-
) -> None:
2920+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
29412921
self._write(f"Promoting {snapshot.name}")
29422922

29432923
def stop_promotion_progress(self, success: bool = True) -> None:

sqlmesh/core/environment.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,20 @@ def promoted_snapshot_id_dicts(self) -> t.List[dict]:
164164

165165
@property
166166
def promoted_snapshots(self) -> t.List[SnapshotTableInfo]:
167+
def has_virtual_view(s: SnapshotTableInfo) -> bool:
168+
return (
169+
s.is_model and s.model_kind_name is not None and not s.model_kind_name.is_symbolic
170+
)
171+
167172
if self.promoted_snapshot_ids is None:
168-
return self.snapshots
173+
return [s for s in self.snapshots if has_virtual_view(s)]
169174

170175
promoted_snapshot_ids = set(self.promoted_snapshot_ids)
171-
return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids]
176+
return [
177+
s
178+
for s in self.snapshots
179+
if s.snapshot_id in promoted_snapshot_ids and has_virtual_view(s)
180+
]
172181

173182
@property
174183
def previous_finalized_snapshots(self) -> t.Optional[t.List[SnapshotTableInfo]]:

sqlmesh/core/plan/evaluator.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -361,14 +361,8 @@ def _update_views(
361361

362362
environment = plan.environment
363363

364-
# progress bar should only show snapshots that have a virtual layer view
365-
snapshots_with_virtual_views = [
366-
s.snapshot_id
367-
for s in [*promotion_result.added, *promotion_result.removed]
368-
if s.is_model and s.model_kind_name and not s.model_kind_name.is_symbolic
369-
]
370364
self.console.start_promotion_progress(
371-
len(snapshots_with_virtual_views),
365+
len(promotion_result.added) + len(promotion_result.removed),
372366
environment.naming_info,
373367
self.default_catalog,
374368
)
@@ -382,15 +376,13 @@ def _update_views(
382376
deployability_index=deployability_index,
383377
on_complete=lambda s: self.console.update_promotion_progress(s, True),
384378
snapshots=snapshots,
385-
snapshots_with_virtual_views=snapshots_with_virtual_views,
386379
)
387380
if promotion_result.removed_environment_naming_info:
388381
self._demote_snapshots(
389382
plan,
390383
promotion_result.removed,
391384
promotion_result.removed_environment_naming_info,
392385
on_complete=lambda s: self.console.update_promotion_progress(s, False),
393-
snapshots_with_virtual_views=snapshots_with_virtual_views,
394386
)
395387

396388
self.state_sync.finalize(environment)
@@ -406,7 +398,6 @@ def _promote_snapshots(
406398
snapshots: t.Dict[SnapshotId, Snapshot],
407399
deployability_index: t.Optional[DeployabilityIndex] = None,
408400
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
409-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
410401
) -> None:
411402
self.snapshot_evaluator.promote(
412403
target_snapshots,
@@ -423,7 +414,6 @@ def _promote_snapshots(
423414
environment_naming_info=environment_naming_info,
424415
deployability_index=deployability_index,
425416
on_complete=on_complete,
426-
snapshots_with_virtual_views=snapshots_with_virtual_views,
427417
)
428418

429419
def _demote_snapshots(
@@ -432,13 +422,11 @@ def _demote_snapshots(
432422
target_snapshots: t.Iterable[SnapshotTableInfo],
433423
environment_naming_info: EnvironmentNamingInfo,
434424
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
435-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
436425
) -> None:
437426
self.snapshot_evaluator.demote(
438427
target_snapshots,
439428
environment_naming_info,
440429
on_complete=on_complete,
441-
snapshots_with_virtual_views=snapshots_with_virtual_views,
442430
)
443431

444432
def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]) -> None:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ def promote(
216216
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
217217
table_mapping: t.Optional[t.Dict[str, str]] = None,
218218
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
219-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
220219
) -> None:
221220
"""Promotes the given collection of snapshots in the target environment by replacing a corresponding
222221
view with a physical table associated with the given snapshot.
@@ -250,7 +249,6 @@ def promote(
250249
environment_naming_info=environment_naming_info,
251250
deployability_index=deployability_index, # type: ignore
252251
on_complete=on_complete,
253-
snapshots_with_virtual_views=snapshots_with_virtual_views,
254252
),
255253
self.ddl_concurrent_tasks,
256254
)
@@ -260,7 +258,6 @@ def demote(
260258
target_snapshots: t.Iterable[SnapshotInfoLike],
261259
environment_naming_info: EnvironmentNamingInfo,
262260
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
263-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
264261
) -> None:
265262
"""Demotes the given collection of snapshots in the target environment by removing its view.
266263
@@ -272,9 +269,7 @@ def demote(
272269
with self.concurrent_context():
273270
concurrent_apply_to_snapshots(
274271
target_snapshots,
275-
lambda s: self._demote_snapshot(
276-
s, environment_naming_info, on_complete, snapshots_with_virtual_views
277-
),
272+
lambda s: self._demote_snapshot(s, environment_naming_info, on_complete),
278273
self.ddl_concurrent_tasks,
279274
)
280275

@@ -923,7 +918,6 @@ def _promote_snapshot(
923918
execution_time: t.Optional[TimeLike] = None,
924919
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
925920
table_mapping: t.Optional[t.Dict[str, str]] = None,
926-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
927921
) -> None:
928922
if snapshot.is_model:
929923
adapter = self.adapter
@@ -950,31 +944,22 @@ def _promote_snapshot(
950944
)
951945
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
952946

953-
if (
954-
on_complete is not None
955-
and snapshots_with_virtual_views
956-
and snapshot.snapshot_id in snapshots_with_virtual_views
957-
):
947+
if on_complete is not None:
958948
on_complete(snapshot)
959949

960950
def _demote_snapshot(
961951
self,
962952
snapshot: SnapshotInfoLike,
963953
environment_naming_info: EnvironmentNamingInfo,
964954
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
965-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
966955
) -> None:
967956
adapter = self.adapter
968957
view_name = snapshot.qualified_view_name.for_environment(
969958
environment_naming_info, dialect=adapter.dialect
970959
)
971960
_evaluation_strategy(snapshot, adapter).demote(view_name)
972961

973-
if (
974-
on_complete is not None
975-
and snapshots_with_virtual_views
976-
and snapshot.snapshot_id in snapshots_with_virtual_views
977-
):
962+
if on_complete is not None:
978963
on_complete(snapshot)
979964

980965
def _cleanup_snapshot(

0 commit comments

Comments
 (0)