Skip to content

Commit be9abb7

Browse files
committed
PR feedback
1 parent 7b367d3 commit be9abb7

7 files changed

Lines changed: 58 additions & 59 deletions

File tree

sqlmesh/core/console.py

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,7 @@ def start_promotion_progress(
172172
"""Indicates that a new snapshot promotion progress has begun."""
173173

174174
@abc.abstractmethod
175-
def update_promotion_progress(
176-
self,
177-
snapshot: SnapshotInfoLike,
178-
promoted: bool,
179-
) -> None:
175+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
180176
"""Update the snapshot promotion progress."""
181177

182178
@abc.abstractmethod
@@ -410,11 +406,7 @@ def start_promotion_progress(
410406
) -> None:
411407
pass
412408

413-
def update_promotion_progress(
414-
self,
415-
snapshot: SnapshotInfoLike,
416-
promoted: bool,
417-
) -> None:
409+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
418410
pass
419411

420412
def stop_promotion_progress(self, success: bool = True) -> None:
@@ -825,11 +817,7 @@ def start_promotion_progress(
825817
self.environment_naming_info = environment_naming_info
826818
self.default_catalog = default_catalog
827819

828-
def update_promotion_progress(
829-
self,
830-
snapshot: SnapshotInfoLike,
831-
promoted: bool,
832-
) -> None:
820+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
833821
"""Update the snapshot promotion progress."""
834822
if self.promotion_progress is not None and self.promotion_task is not None:
835823
if self.verbosity >= Verbosity.VERBOSE:
@@ -2424,11 +2412,7 @@ def start_promotion_progress(
24242412
self.promotion_status = (0, total_tasks)
24252413
print(f"Virtually Updating '{environment_naming_info.name}'")
24262414

2427-
def update_promotion_progress(
2428-
self,
2429-
snapshot: SnapshotInfoLike,
2430-
promoted: bool,
2431-
) -> None:
2415+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
24322416
"""Update the snapshot promotion progress."""
24332417
num_promotions, total_promotions = self.promotion_status
24342418
num_promotions += 1
@@ -2559,11 +2543,7 @@ def start_promotion_progress(
25592543
) -> None:
25602544
self._write(f"Starting promotion for {total_tasks} snapshots")
25612545

2562-
def update_promotion_progress(
2563-
self,
2564-
snapshot: SnapshotInfoLike,
2565-
promoted: bool,
2566-
) -> None:
2546+
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
25672547
self._write(f"Promoting {snapshot.name}")
25682548

25692549
def stop_promotion_progress(self, success: bool = True) -> None:

sqlmesh/core/plan/evaluator.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,11 @@ def _update_views(
362362
environment = plan.environment
363363

364364
# progress bar should only show snapshots that have a virtual layer view
365-
snapshots_with_virtual_views = [
365+
snapshots_with_virtual_views = {
366366
s.snapshot_id
367367
for s in [*promotion_result.added, *promotion_result.removed]
368368
if s.is_model and s.model_kind_name and not s.model_kind_name.is_symbolic
369-
]
369+
}
370370
self.console.start_promotion_progress(
371371
len(snapshots_with_virtual_views),
372372
environment.naming_info,
@@ -379,18 +379,18 @@ def _update_views(
379379
plan,
380380
[snapshots[s.snapshot_id] for s in promotion_result.added],
381381
environment.naming_info,
382+
snapshots_with_virtual_views=snapshots_with_virtual_views or set(),
382383
deployability_index=deployability_index,
383384
on_complete=lambda s: self.console.update_promotion_progress(s, True),
384385
snapshots=snapshots,
385-
snapshots_with_virtual_views=snapshots_with_virtual_views,
386386
)
387387
if promotion_result.removed_environment_naming_info:
388388
self._demote_snapshots(
389389
plan,
390390
promotion_result.removed,
391391
promotion_result.removed_environment_naming_info,
392+
snapshots_with_virtual_views=snapshots_with_virtual_views or set(),
392393
on_complete=lambda s: self.console.update_promotion_progress(s, False),
393-
snapshots_with_virtual_views=snapshots_with_virtual_views,
394394
)
395395

396396
self.state_sync.finalize(environment)
@@ -404,9 +404,9 @@ def _promote_snapshots(
404404
target_snapshots: t.Iterable[Snapshot],
405405
environment_naming_info: EnvironmentNamingInfo,
406406
snapshots: t.Dict[SnapshotId, Snapshot],
407+
snapshots_with_virtual_views: t.Set[SnapshotId],
407408
deployability_index: t.Optional[DeployabilityIndex] = None,
408409
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
409-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
410410
) -> None:
411411
self.snapshot_evaluator.promote(
412412
target_snapshots,
@@ -431,8 +431,8 @@ def _demote_snapshots(
431431
plan: EvaluatablePlan,
432432
target_snapshots: t.Iterable[SnapshotTableInfo],
433433
environment_naming_info: EnvironmentNamingInfo,
434+
snapshots_with_virtual_views: t.Set[SnapshotId],
434435
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
435-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
436436
) -> None:
437437
self.snapshot_evaluator.demote(
438438
target_snapshots,

sqlmesh/core/snapshot/evaluator.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -209,14 +209,14 @@ def promote(
209209
self,
210210
target_snapshots: t.Iterable[Snapshot],
211211
environment_naming_info: EnvironmentNamingInfo,
212+
snapshots_with_virtual_views: t.Set[SnapshotId],
212213
deployability_index: t.Optional[DeployabilityIndex] = None,
213214
start: t.Optional[TimeLike] = None,
214215
end: t.Optional[TimeLike] = None,
215216
execution_time: t.Optional[TimeLike] = None,
216217
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
217218
table_mapping: t.Optional[t.Dict[str, str]] = None,
218219
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
219-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
220220
) -> None:
221221
"""Promotes the given collection of snapshots in the target environment by replacing a corresponding
222222
view with a physical table associated with the given snapshot.
@@ -259,8 +259,8 @@ def demote(
259259
self,
260260
target_snapshots: t.Iterable[SnapshotInfoLike],
261261
environment_naming_info: EnvironmentNamingInfo,
262+
snapshots_with_virtual_views: t.Set[SnapshotId],
262263
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
263-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
264264
) -> None:
265265
"""Demotes the given collection of snapshots in the target environment by removing its view.
266266
@@ -273,7 +273,7 @@ def demote(
273273
concurrent_apply_to_snapshots(
274274
target_snapshots,
275275
lambda s: self._demote_snapshot(
276-
s, environment_naming_info, on_complete, snapshots_with_virtual_views
276+
s, environment_naming_info, snapshots_with_virtual_views, on_complete
277277
),
278278
self.ddl_concurrent_tasks,
279279
)
@@ -917,13 +917,13 @@ def _promote_snapshot(
917917
snapshot: Snapshot,
918918
environment_naming_info: EnvironmentNamingInfo,
919919
deployability_index: DeployabilityIndex,
920+
snapshots_with_virtual_views: t.Set[SnapshotId],
920921
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
921922
start: t.Optional[TimeLike] = None,
922923
end: t.Optional[TimeLike] = None,
923924
execution_time: t.Optional[TimeLike] = None,
924925
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
925926
table_mapping: t.Optional[t.Dict[str, str]] = None,
926-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
927927
) -> None:
928928
if snapshot.is_model:
929929
adapter = self.adapter
@@ -950,31 +950,23 @@ def _promote_snapshot(
950950
)
951951
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
952952

953-
if (
954-
on_complete is not None
955-
and snapshots_with_virtual_views
956-
and snapshot.snapshot_id in snapshots_with_virtual_views
957-
):
953+
if on_complete is not None and snapshot.snapshot_id in snapshots_with_virtual_views:
958954
on_complete(snapshot)
959955

960956
def _demote_snapshot(
961957
self,
962958
snapshot: SnapshotInfoLike,
963959
environment_naming_info: EnvironmentNamingInfo,
960+
snapshots_with_virtual_views: t.Set[SnapshotId],
964961
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
965-
snapshots_with_virtual_views: t.Optional[t.List[SnapshotId]] = None,
966962
) -> None:
967963
adapter = self.adapter
968964
view_name = snapshot.qualified_view_name.for_environment(
969965
environment_naming_info, dialect=adapter.dialect
970966
)
971967
_evaluation_strategy(snapshot, adapter).demote(view_name)
972968

973-
if (
974-
on_complete is not None
975-
and snapshots_with_virtual_views
976-
and snapshot.snapshot_id in snapshots_with_virtual_views
977-
):
969+
if on_complete is not None and snapshot.snapshot_id in snapshots_with_virtual_views:
978970
on_complete(snapshot)
979971

980972
def _cleanup_snapshot(

sqlmesh/engines/commands.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,14 @@ class EvaluateCommandPayload(PydanticModel):
4545
class PromoteCommandPayload(PydanticModel):
4646
snapshots: t.List[Snapshot]
4747
environment_naming_info: EnvironmentNamingInfo
48+
snapshots_with_virtual_views: t.Set[SnapshotId]
4849
deployability_index: DeployabilityIndex
4950

5051

5152
class DemoteCommandPayload(PydanticModel):
5253
snapshots: t.List[SnapshotTableInfo]
5354
environment_naming_info: EnvironmentNamingInfo
55+
snapshots_with_virtual_views: t.Set[SnapshotId]
5456

5557

5658
class CleanupCommandPayload(PydanticModel):
@@ -119,6 +121,7 @@ def promote(
119121
evaluator.promote(
120122
command_payload.snapshots,
121123
command_payload.environment_naming_info,
124+
command_payload.snapshots_with_virtual_views,
122125
deployability_index=command_payload.deployability_index,
123126
)
124127

@@ -131,6 +134,7 @@ def demote(
131134
evaluator.demote(
132135
command_payload.snapshots,
133136
command_payload.environment_naming_info,
137+
command_payload.snapshots_with_virtual_views,
134138
)
135139

136140

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,11 @@ def _create_update_views_tasks(
405405
create_views_task = self._create_snapshot_promotion_operator(
406406
[snapshots[x.snapshot_id] for x in request.environment.promoted_snapshots],
407407
environment_naming_info,
408+
{
409+
x.snapshot_id
410+
for x in request.environment.promoted_snapshots
411+
if x.is_model and x.model_kind_name and not x.model_kind_name.is_symbolic
412+
},
408413
request.ddl_concurrent_tasks,
409414
request.deployability_index,
410415
"snapshot_promotion_create_views",
@@ -414,6 +419,11 @@ def _create_update_views_tasks(
414419
delete_views_task = self._create_snapshot_demotion_operator(
415420
request.demoted_snapshots,
416421
environment_naming_info,
422+
{
423+
x.snapshot_id
424+
for x in request.demoted_snapshots
425+
if x.is_model and x.model_kind_name and not x.model_kind_name.is_symbolic
426+
},
417427
request.ddl_concurrent_tasks,
418428
"snapshot_promotion_delete_views",
419429
)
@@ -540,6 +550,7 @@ def _create_snapshot_promotion_operator(
540550
self,
541551
snapshots: t.List[Snapshot],
542552
environment_naming_info: EnvironmentNamingInfo,
553+
snapshots_with_virtual_views: t.Set[SnapshotId],
543554
ddl_concurrent_tasks: int,
544555
deployability_index: DeployabilityIndex,
545556
task_id: str,
@@ -549,6 +560,7 @@ def _create_snapshot_promotion_operator(
549560
target=targets.SnapshotPromotionTarget(
550561
snapshots=snapshots,
551562
environment_naming_info=environment_naming_info,
563+
snapshots_with_virtual_views=snapshots_with_virtual_views,
552564
ddl_concurrent_tasks=ddl_concurrent_tasks,
553565
deployability_index=deployability_index,
554566
),
@@ -559,6 +571,7 @@ def _create_snapshot_demotion_operator(
559571
self,
560572
snapshots: t.List[SnapshotTableInfo],
561573
environment_naming_info: EnvironmentNamingInfo,
574+
snapshots_with_virtual_views: t.Set[SnapshotId],
562575
ddl_concurrent_tasks: int,
563576
task_id: str,
564577
) -> BaseOperator:
@@ -567,6 +580,7 @@ def _create_snapshot_demotion_operator(
567580
target=targets.SnapshotDemotionTarget(
568581
snapshots=snapshots,
569582
environment_naming_info=environment_naming_info,
583+
snapshots_with_virtual_views=snapshots_with_virtual_views,
570584
ddl_concurrent_tasks=ddl_concurrent_tasks,
571585
),
572586
task_id=task_id,

sqlmesh/schedulers/airflow/operators/targets.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ class SnapshotPromotionTarget(PydanticModel, BaseTarget[commands.PromoteCommandP
188188
Args:
189189
snapshots: The list of snapshots that should be promoted in the target environment.
190190
environment_naming_info: Naming information for the target environment.
191+
snapshots_with_virtual_views: The set of snapshots with virtual layer views.
191192
ddl_concurrent_tasks: The number of concurrent tasks used for DDL
192193
operations (table / view creation, deletion, etc). Default: 1.
193194
deployability_index: Determines snapshots that are deployable in the context of this promotion.
@@ -200,13 +201,15 @@ class SnapshotPromotionTarget(PydanticModel, BaseTarget[commands.PromoteCommandP
200201

201202
snapshots: t.List[Snapshot]
202203
environment_naming_info: EnvironmentNamingInfo
204+
snapshots_with_virtual_views: t.Set[Snapshot]
203205
ddl_concurrent_tasks: int
204206
deployability_index: DeployabilityIndex
205207

206208
def _get_command_payload(self, context: Context) -> t.Optional[commands.PromoteCommandPayload]:
207209
return commands.PromoteCommandPayload(
208210
snapshots=self.snapshots,
209211
environment_naming_info=self.environment_naming_info,
212+
snapshots_with_virtual_views=self.snapshots_with_virtual_views,
210213
deployability_index=self.deployability_index,
211214
)
212215

@@ -228,12 +231,14 @@ class SnapshotDemotionTarget(PydanticModel, BaseTarget[commands.DemoteCommandPay
228231

229232
snapshots: t.List[SnapshotTableInfo]
230233
environment_naming_info: EnvironmentNamingInfo
234+
snapshots_with_virtual_views: t.Set[Snapshot]
231235
ddl_concurrent_tasks: int
232236

233237
def _get_command_payload(self, context: Context) -> t.Optional[commands.DemoteCommandPayload]:
234238
return commands.DemoteCommandPayload(
235239
snapshots=self.snapshots,
236240
environment_naming_info=self.environment_naming_info,
241+
snapshots_with_virtual_views=self.snapshots_with_virtual_views,
237242
)
238243

239244

0 commit comments

Comments
 (0)