Skip to content

Commit aa973fa

Browse files
committed
Chore: Update the plan evaluator to use stages
1 parent e3bf211 commit aa973fa

11 files changed

Lines changed: 1204 additions & 556 deletions

File tree

sqlmesh/core/plan/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,5 @@
88
from sqlmesh.core.plan.evaluator import (
99
BuiltInPlanEvaluator as BuiltInPlanEvaluator,
1010
PlanEvaluator as PlanEvaluator,
11-
update_intervals_for_new_snapshots as update_intervals_for_new_snapshots,
1211
)
1312
from sqlmesh.core.plan.explainer import PlanExplainer as PlanExplainer

sqlmesh/core/plan/evaluator.py

Lines changed: 193 additions & 362 deletions
Large diffs are not rendered by default.

sqlmesh/core/plan/explainer.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from sqlmesh.core.snapshot.definition import (
1818
SnapshotInfoMixin,
1919
)
20-
from sqlmesh.utils import Verbosity, rich as srich
20+
from sqlmesh.utils import Verbosity, rich as srich, to_snake_case
2121
from sqlmesh.utils.date import to_ts
2222
from sqlmesh.utils.errors import SQLMeshError
2323

@@ -73,7 +73,7 @@ def __init__(
7373
def explain(self, stages: t.List[stages.PlanStage]) -> None:
7474
tree = Tree("[bold]Explained plan[/bold]")
7575
for stage in stages:
76-
handler_name = f"visit_{_to_snake_case(stage.__class__.__name__)}"
76+
handler_name = f"visit_{to_snake_case(stage.__class__.__name__)}"
7777
if not hasattr(self, handler_name):
7878
logger.error("Unexpected stage: %s", stage.__class__.__name__)
7979
continue
@@ -97,6 +97,9 @@ def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateSta
9797
"[bold]Validate SQL and create physical layer tables and views if they do not exist[/bold]"
9898
)
9999
for snapshot in stage.snapshots:
100+
if snapshot.snapshot_id not in stage.snapshots_with_missing_intervals:
101+
continue
102+
100103
is_deployable = (
101104
stage.deployability_index.is_deployable(snapshot)
102105
if self.environment_naming_info.name != c.PROD
@@ -233,11 +236,24 @@ def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage
233236
tree.add(self._limit_tree(demote_tree))
234237
return tree
235238

239+
def visit_create_snapshot_records_stage(
240+
self, stage: stages.CreateSnapshotRecordsStage
241+
) -> t.Optional[Tree]:
242+
return None
243+
236244
def visit_environment_record_update_stage(
237245
self, stage: stages.EnvironmentRecordUpdateStage
238246
) -> t.Optional[Tree]:
239247
return None
240248

249+
def visit_unpause_stage(self, stage: stages.UnpauseStage) -> t.Optional[Tree]:
250+
return None
251+
252+
def visit_finalize_environment_stage(
253+
self, stage: stages.FinalizeEnvironmentStage
254+
) -> t.Optional[Tree]:
255+
return None
256+
241257
def _display_name(self, snapshot: SnapshotInfoMixin) -> str:
242258
return snapshot.display_name(
243259
self.environment_naming_info,
@@ -273,9 +289,3 @@ def _get_explainer_console(
273289
verbosity=console.verbosity,
274290
console=console.console,
275291
)
276-
277-
278-
def _to_snake_case(name: str) -> str:
279-
return "".join(
280-
f"_{c.lower()}" if c.isupper() and idx != 0 else c.lower() for idx, c in enumerate(name)
281-
)

0 commit comments

Comments
 (0)