Skip to content

Commit c6c1b02

Browse files
committed
Feat: Introduce plan explain mode
1 parent f6f441a commit c6c1b02

11 files changed

Lines changed: 636 additions & 63 deletions

File tree

sqlmesh/cli/main.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,13 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
457457
@click.option(
458458
"--diff-rendered",
459459
is_flag=True,
460-
help="Output text differences for the rendered versions of the models and standalone audits",
460+
help="Output text differences for the rendered versions of the models and standalone audits.",
461+
default=None,
462+
)
463+
@click.option(
464+
"--explain",
465+
is_flag=True,
466+
help="Explain the plan instead of applying it.",
461467
default=None,
462468
)
463469
@opt.verbose

sqlmesh/core/context.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
NotificationTarget,
9090
NotificationTargetManager,
9191
)
92-
from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals
92+
from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals, PlanExplainer
9393
from sqlmesh.core.plan.definition import UserProvidedFlags
9494
from sqlmesh.core.reference import ReferenceGraph
9595
from sqlmesh.core.scheduler import Scheduler, CompletionStatus
@@ -1215,6 +1215,7 @@ def plan(
12151215
run: t.Optional[bool] = None,
12161216
diff_rendered: t.Optional[bool] = None,
12171217
skip_linter: t.Optional[bool] = None,
1218+
explain: t.Optional[bool] = None,
12181219
) -> Plan:
12191220
"""Interactively creates a plan.
12201221
@@ -1260,6 +1261,7 @@ def plan(
12601261
run: Whether to run latest intervals as part of the plan application.
12611262
diff_rendered: Whether the diff should compare raw vs rendered models
12621263
skip_linter: Linter runs by default so this will skip it if enabled
1264+
explain: Whether to explain the plan instead of applying it.
12631265
12641266
Returns:
12651267
The populated Plan object.
@@ -1287,6 +1289,7 @@ def plan(
12871289
run=run,
12881290
diff_rendered=diff_rendered,
12891291
skip_linter=skip_linter,
1292+
explain=explain,
12901293
)
12911294

12921295
plan = plan_builder.build()
@@ -1296,6 +1299,9 @@ def plan(
12961299
# or if there are any uncategorized snapshots in the plan
12971300
no_prompts = False
12981301

1302+
if explain:
1303+
auto_apply = True
1304+
12991305
self.console.plan(
13001306
plan_builder,
13011307
auto_apply if auto_apply is not None else self.config.plan.auto_apply,
@@ -1332,6 +1338,7 @@ def plan_builder(
13321338
run: t.Optional[bool] = None,
13331339
diff_rendered: t.Optional[bool] = None,
13341340
skip_linter: t.Optional[bool] = None,
1341+
explain: t.Optional[bool] = None,
13351342
) -> PlanBuilder:
13361343
"""Creates a plan builder.
13371344
@@ -1548,6 +1555,7 @@ def plan_builder(
15481555
interval_end_per_model=max_interval_end_per_model,
15491556
console=self.console,
15501557
user_provided_flags=user_provided_flags,
1558+
explain=explain or False,
15511559
)
15521560

15531561
def apply(
@@ -1572,6 +1580,16 @@ def apply(
15721580
return
15731581
if plan.uncategorized:
15741582
raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.")
1583+
1584+
if plan.explain:
1585+
explainer = PlanExplainer(
1586+
state_reader=self.state_reader,
1587+
default_catalog=self.default_catalog,
1588+
console=self.console,
1589+
)
1590+
explainer.evaluate(plan.to_evaluatable())
1591+
return
1592+
15751593
self.notification_target_manager.notify(
15761594
NotificationEvent.APPLY_START,
15771595
environment=plan.environment_naming_info.name,

sqlmesh/core/environment.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,19 @@ def summary(self) -> EnvironmentSummary:
229229
finalized_ts=self.finalized_ts,
230230
)
231231

232+
def can_partially_promote(self, existing_environment: Environment) -> bool:
233+
"""Returns True if the existing environment can be partially promoted to the current environment.
234+
235+
Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the
236+
target environment.
237+
"""
238+
return (
239+
bool(existing_environment.finalized_ts)
240+
and not existing_environment.expired
241+
and existing_environment.gateway_managed == self.gateway_managed
242+
and existing_environment.name == c.PROD
243+
)
244+
232245
def _convert_list_to_models_and_store(
233246
self, field: str, type_: t.Type[PydanticType]
234247
) -> t.Optional[t.List[PydanticType]]:

sqlmesh/core/plan/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@
1010
PlanEvaluator as PlanEvaluator,
1111
update_intervals_for_new_snapshots as update_intervals_for_new_snapshots,
1212
)
13+
from sqlmesh.core.plan.explainer import PlanExplainer as PlanExplainer

sqlmesh/core/plan/builder.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class PlanBuilder:
8383
environment state, or to use whatever snapshots are in the current environment state even if
8484
the environment is not finalized.
8585
interval_end_per_model: The mapping from model FQNs to target end dates.
86+
explain: Whether to explain the plan instead of applying it.
8687
"""
8788

8889
def __init__(
@@ -112,6 +113,7 @@ def __init__(
112113
enable_preview: bool = False,
113114
end_bounded: bool = False,
114115
ensure_finalized_snapshots: bool = False,
116+
explain: bool = False,
115117
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
116118
console: t.Optional[PlanBuilderConsole] = None,
117119
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
@@ -142,6 +144,7 @@ def __init__(
142144
self._console = console or get_console()
143145
self._choices: t.Dict[SnapshotId, SnapshotChangeCategory] = {}
144146
self._user_provided_flags = user_provided_flags
147+
self._explain = explain
145148

146149
self._start = start
147150
if not self._start and (
@@ -273,6 +276,7 @@ def build(self) -> Plan:
273276
empty_backfill=self._empty_backfill,
274277
no_gaps=self._no_gaps,
275278
forward_only=self._forward_only,
279+
explain=self._explain,
276280
allow_destructive_models=t.cast(t.Set, self._allow_destructive_models),
277281
include_unmodified=self._include_unmodified,
278282
environment_ttl=self._environment_ttl,

sqlmesh/core/plan/definition.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class Plan(PydanticModel, frozen=True):
4646
include_unmodified: bool
4747
end_bounded: bool
4848
ensure_finalized_snapshots: bool
49+
explain: bool
4950

5051
environment_ttl: t.Optional[str] = None
5152
environment_naming_info: EnvironmentNamingInfo

sqlmesh/core/plan/evaluator.py

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
SnapshotCreationFailedError,
3737
)
3838
from sqlmesh.utils import CompletionStatus
39-
from sqlmesh.core.state_sync import StateSync
39+
from sqlmesh.core.state_sync import StateSync, StateReader
4040
from sqlmesh.core.state_sync.base import PromotionResult
4141
from sqlmesh.utils.concurrency import NodeExecutionFailedError
4242
from sqlmesh.utils.errors import PlanError
@@ -284,23 +284,7 @@ def _push(
284284
new_snapshots=plan.new_snapshots, plan_id=plan.plan_id
285285
)
286286

287-
promoted_snapshot_ids = (
288-
set(plan.environment.promoted_snapshot_ids)
289-
if plan.environment.promoted_snapshot_ids is not None
290-
else None
291-
)
292-
293-
def _should_create(s: Snapshot) -> bool:
294-
if not s.is_model or s.is_symbolic:
295-
return False
296-
# Only create tables for snapshots that we're planning to promote or that were selected for backfill
297-
return (
298-
plan.is_selected_for_backfill(s.name)
299-
or promoted_snapshot_ids is None
300-
or s.snapshot_id in promoted_snapshot_ids
301-
)
302-
303-
snapshots_to_create = [s for s in snapshots.values() if _should_create(s)]
287+
snapshots_to_create = get_snapshots_to_create(plan, snapshots)
304288

305289
completion_status = None
306290
progress_stopped = False
@@ -573,32 +557,7 @@ def _run_audits_for_metadata_snapshots(
573557
plan: EvaluatablePlan,
574558
new_snapshots: t.Dict[SnapshotId, Snapshot],
575559
) -> None:
576-
# Filter out snapshots that are not categorized as metadata changes on models
577-
metadata_snapshots = []
578-
for snapshot in new_snapshots.values():
579-
if not snapshot.is_metadata or not snapshot.is_model or not snapshot.evaluatable:
580-
continue
581-
582-
metadata_snapshots.append(snapshot)
583-
584-
# Bulk load all the previous snapshots
585-
previous_snapshots = self.state_sync.get_snapshots(
586-
[
587-
s.previous_version.snapshot_id(s.name)
588-
for s in metadata_snapshots
589-
if s.previous_version
590-
]
591-
).values()
592-
593-
# Check if any of the snapshots have modifications to the audits field by comparing the hashes
594-
audit_snapshots = {}
595-
for snapshot, previous_snapshot in zip(metadata_snapshots, previous_snapshots):
596-
new_audits_hash = snapshot.model.audit_metadata_hash()
597-
previous_audit_hash = previous_snapshot.model.audit_metadata_hash()
598-
599-
if snapshot.model.audits and previous_audit_hash != new_audits_hash:
600-
audit_snapshots[snapshot.snapshot_id] = snapshot
601-
560+
audit_snapshots = get_audit_only_snapshots(new_snapshots, self.state_sync)
602561
if not audit_snapshots:
603562
return
604563

@@ -636,3 +595,52 @@ def update_intervals_for_new_snapshots(
636595

637596
if snapshots_intervals:
638597
state_sync.add_snapshots_intervals(snapshots_intervals)
598+
599+
600+
def get_audit_only_snapshots(
601+
new_snapshots: t.Dict[SnapshotId, Snapshot], state_reader: StateReader
602+
) -> t.Dict[SnapshotId, Snapshot]:
603+
metadata_snapshots = []
604+
for snapshot in new_snapshots.values():
605+
if not snapshot.is_metadata or not snapshot.is_model or not snapshot.evaluatable:
606+
continue
607+
608+
metadata_snapshots.append(snapshot)
609+
610+
# Bulk load all the previous snapshots
611+
previous_snapshots = state_reader.get_snapshots(
612+
[s.previous_version.snapshot_id(s.name) for s in metadata_snapshots if s.previous_version]
613+
).values()
614+
615+
# Check if any of the snapshots have modifications to the audits field by comparing the hashes
616+
audit_snapshots = {}
617+
for snapshot, previous_snapshot in zip(metadata_snapshots, previous_snapshots):
618+
new_audits_hash = snapshot.model.audit_metadata_hash()
619+
previous_audit_hash = previous_snapshot.model.audit_metadata_hash()
620+
621+
if snapshot.model.audits and previous_audit_hash != new_audits_hash:
622+
audit_snapshots[snapshot.snapshot_id] = snapshot
623+
624+
return audit_snapshots
625+
626+
627+
def get_snapshots_to_create(
628+
plan: EvaluatablePlan, snapshots: t.Dict[SnapshotId, Snapshot]
629+
) -> t.List[Snapshot]:
630+
promoted_snapshot_ids = (
631+
set(plan.environment.promoted_snapshot_ids)
632+
if plan.environment.promoted_snapshot_ids is not None
633+
else None
634+
)
635+
636+
def _should_create(s: Snapshot) -> bool:
637+
if not s.is_model or s.is_symbolic:
638+
return False
639+
# Only create tables for snapshots that we're planning to promote or that were selected for backfill
640+
return (
641+
plan.is_selected_for_backfill(s.name)
642+
or promoted_snapshot_ids is None
643+
or s.snapshot_id in promoted_snapshot_ids
644+
)
645+
646+
return [s for s in snapshots.values() if _should_create(s)]

0 commit comments

Comments
 (0)