Skip to content

Commit 5d311d1

Browse files
committed
Filter snapshots to promote in environment
1 parent b982974 commit 5d311d1

4 files changed

Lines changed: 20 additions & 58 deletions

File tree

sqlmesh/core/console.py

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,7 @@ def start_promotion_progress(
246246
"""Indicates that a new snapshot promotion progress has begun."""
247247

248248
@abc.abstractmethod
249-
def update_promotion_progress(
250-
self,
251-
snapshot: SnapshotInfoLike,
252-
promoted: bool,
253-
) -> None:
249+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
254250
"""Update the snapshot promotion progress."""
255251

256252
@abc.abstractmethod
@@ -478,11 +474,7 @@ def start_promotion_progress(
478474
) -> None:
479475
pass
480476

481-
def update_promotion_progress(
482-
self,
483-
snapshot: SnapshotInfoLike,
484-
promoted: bool,
485-
) -> None:
477+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
486478
pass
487479

488480
def stop_promotion_progress(self, success: bool = True) -> None:
@@ -958,11 +950,7 @@ def start_promotion_progress(
958950
self.environment_naming_info = environment_naming_info
959951
self.default_catalog = default_catalog
960952

961-
def update_promotion_progress(
962-
self,
963-
snapshot: SnapshotInfoLike,
964-
promoted: bool,
965-
) -> None:
953+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
966954
"""Update the snapshot promotion progress."""
967955
if self.promotion_progress is not None and self.promotion_task is not None:
968956
if self.verbosity >= Verbosity.VERBOSE:
@@ -2833,11 +2821,7 @@ def start_promotion_progress(
28332821
self.promotion_status = (0, total_tasks)
28342822
print(f"Virtually Updating '{environment_naming_info.name}'")
28352823

2836-
def update_promotion_progress(
2837-
self,
2838-
snapshot: SnapshotInfoLike,
2839-
promoted: bool,
2840-
) -> None:
2824+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
28412825
"""Update the snapshot promotion progress."""
28422826
num_promotions, total_promotions = self.promotion_status
28432827
num_promotions += 1
@@ -2968,11 +2952,7 @@ def start_promotion_progress(
29682952
) -> None:
29692953
self._write(f"Starting promotion for {total_tasks} snapshots")
29702954

2971-
def update_promotion_progress(
2972-
self,
2973-
snapshot: SnapshotInfoLike,
2974-
promoted: bool,
2975-
) -> None:
2955+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
29762956
self._write(f"Promoting {snapshot.name}")
29772957

29782958
def stop_promotion_progress(self, success: bool = True) -> None:

sqlmesh/core/environment.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,20 @@ def promoted_snapshot_id_dicts(self) -> t.List[dict]:
169169

170170
@property
171171
def promoted_snapshots(self) -> t.List[SnapshotTableInfo]:
172+
def has_virtual_view(s: SnapshotTableInfo) -> bool:
173+
return (
174+
s.is_model and s.model_kind_name is not None and not s.model_kind_name.is_symbolic
175+
)
176+
172177
if self.promoted_snapshot_ids is None:
173-
return self.snapshots
178+
return [s for s in self.snapshots if has_virtual_view(s)]
174179

175180
promoted_snapshot_ids = set(self.promoted_snapshot_ids)
176-
return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids]
181+
return [
182+
s
183+
for s in self.snapshots
184+
if s.snapshot_id in promoted_snapshot_ids and has_virtual_view(s)
185+
]
177186

178187
@property
179188
def previous_finalized_snapshots(self) -> t.Optional[t.List[SnapshotTableInfo]]:

sqlmesh/core/plan/evaluator.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -375,14 +375,8 @@ def _update_views(
375375

376376
environment = plan.environment
377377

378-
# progress bar should only show snapshots that have a virtual layer view
379-
snapshots_with_virtual_views = [
380-
s.snapshot_id
381-
for s in [*promotion_result.added, *promotion_result.removed]
382-
if s.is_model and s.model_kind_name and not s.model_kind_name.is_symbolic
383-
]
384378
self.console.start_promotion_progress(
385-
len(snapshots_with_virtual_views),
379+
len(promotion_result.added) + len(promotion_result.removed),
386380
environment.naming_info,
387381
self.default_catalog,
388382
)
@@ -396,15 +390,13 @@ def _update_views(
396390
deployability_index=deployability_index,
397391
on_complete=lambda s: self.console.update_promotion_progress(s, True),
398392
snapshots=snapshots,
399-
snapshots_with_virtual_views=snapshots_with_virtual_views,
400393
)
401394
if promotion_result.removed_environment_naming_info:
402395
self._demote_snapshots(
403396
plan,
404397
promotion_result.removed,
405398
promotion_result.removed_environment_naming_info,
406399
on_complete=lambda s: self.console.update_promotion_progress(s, False),
407-
snapshots_with_virtual_views=snapshots_with_virtual_views,
408400
)
409401

410402
self.state_sync.finalize(environment)
@@ -420,7 +412,6 @@ def _promote_snapshots(
420412
snapshots: t.Dict[SnapshotId, Snapshot],
421413
deployability_index: t.Optional[DeployabilityIndex] = None,
422414
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
423-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
424415
) -> None:
425416
self.snapshot_evaluator.promote(
426417
target_snapshots,
@@ -437,7 +428,6 @@ def _promote_snapshots(
437428
environment_naming_info=environment_naming_info,
438429
deployability_index=deployability_index,
439430
on_complete=on_complete,
440-
snapshots_with_virtual_views=snapshots_with_virtual_views,
441431
)
442432

443433
def _demote_snapshots(
@@ -446,13 +436,11 @@ def _demote_snapshots(
446436
target_snapshots: t.Iterable[SnapshotTableInfo],
447437
environment_naming_info: EnvironmentNamingInfo,
448438
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
449-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
450439
) -> None:
451440
self.snapshot_evaluator.demote(
452441
target_snapshots,
453442
environment_naming_info,
454443
on_complete=on_complete,
455-
snapshots_with_virtual_views=snapshots_with_virtual_views,
456444
)
457445

458446
def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]) -> None:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ def promote(
216216
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
217217
table_mapping: t.Optional[t.Dict[str, str]] = None,
218218
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
219-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
220219
) -> None:
221220
"""Promotes the given collection of snapshots in the target environment by replacing a corresponding
222221
view with a physical table associated with the given snapshot.
@@ -258,7 +257,6 @@ def promote(
258257
environment_naming_info=environment_naming_info,
259258
deployability_index=deployability_index, # type: ignore
260259
on_complete=on_complete,
261-
snapshots_with_virtual_views=snapshots_with_virtual_views,
262260
),
263261
self.ddl_concurrent_tasks,
264262
)
@@ -268,7 +266,6 @@ def demote(
268266
target_snapshots: t.Iterable[SnapshotInfoLike],
269267
environment_naming_info: EnvironmentNamingInfo,
270268
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
271-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
272269
) -> None:
273270
"""Demotes the given collection of snapshots in the target environment by removing its view.
274271
@@ -280,9 +277,7 @@ def demote(
280277
with self.concurrent_context():
281278
concurrent_apply_to_snapshots(
282279
target_snapshots,
283-
lambda s: self._demote_snapshot(
284-
s, environment_naming_info, on_complete, snapshots_with_virtual_views
285-
),
280+
lambda s: self._demote_snapshot(s, environment_naming_info, on_complete),
286281
self.ddl_concurrent_tasks,
287282
)
288283

@@ -948,7 +943,6 @@ def _promote_snapshot(
948943
execution_time: t.Optional[TimeLike] = None,
949944
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
950945
table_mapping: t.Optional[t.Dict[str, str]] = None,
951-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
952946
) -> None:
953947
if snapshot.is_model:
954948
adapter = (
@@ -979,19 +973,14 @@ def _promote_snapshot(
979973
)
980974
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
981975

982-
if (
983-
on_complete is not None
984-
and snapshots_with_virtual_views
985-
and snapshot.snapshot_id in snapshots_with_virtual_views
986-
):
976+
if on_complete is not None:
987977
on_complete(snapshot)
988978

989979
def _demote_snapshot(
990980
self,
991981
snapshot: SnapshotInfoLike,
992982
environment_naming_info: EnvironmentNamingInfo,
993983
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
994-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
995984
) -> None:
996985
adapter = (
997986
self.get_adapter(snapshot.model_gateway)
@@ -1003,11 +992,7 @@ def _demote_snapshot(
1003992
)
1004993
_evaluation_strategy(snapshot, adapter).demote(view_name)
1005994

1006-
if (
1007-
on_complete is not None
1008-
and snapshots_with_virtual_views
1009-
and snapshot.snapshot_id in snapshots_with_virtual_views
1010-
):
995+
if on_complete is not None:
1011996
on_complete(snapshot)
1012997

1013998
def _cleanup_snapshot(

0 commit comments

Comments
 (0)