2424)
2525from sqlmesh .core .state_sync import StateReader
2626from sqlmesh .core .snapshot .definition import (
27- Snapshot ,
2827 SnapshotInfoMixin ,
2928 SnapshotNameVersionLike ,
3029)
@@ -59,12 +58,7 @@ def evaluate(
5958
6059 # add extra metadata that's only needed at this point for better --explain output
6160 plan_stages = [
62- ExplainableRestatementStage .from_restatement_stage (
63- stage ,
64- self .state_reader ,
65- plan ,
66- fetch_full_snapshots = explainer_console .verbosity .is_very_verbose ,
67- )
61+ ExplainableRestatementStage .from_restatement_stage (stage , self .state_reader , plan )
6862 if isinstance (stage , stages .RestatementStage )
6963 else stage
7064 for stage in plan_stages
@@ -78,11 +72,6 @@ class ExplainerConsole(abc.ABC):
7872 def explain (self , stages : t .List [stages .PlanStage ]) -> None :
7973 pass
8074
81- @property
82- @abc .abstractmethod
83- def verbosity (self ) -> Verbosity :
84- pass
85-
8675
8776@dataclass
8877class ExplainableRestatementStage (stages .RestatementStage ):
@@ -91,9 +80,7 @@ class ExplainableRestatementStage(stages.RestatementStage):
9180 of what might happen when they ask for the plan to be explained
9281 """
9382
94- snapshot_intervals_to_clear : t .Dict [
95- str , t .List [t .Tuple [t .Optional [Snapshot ], SnapshotIntervalClearRequest ]]
96- ]
83+ snapshot_intervals_to_clear : t .Dict [str , t .List [SnapshotIntervalClearRequest ]]
9784 """Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name."""
9885
9986 @classmethod
@@ -102,7 +89,6 @@ def from_restatement_stage(
10289 stage : stages .RestatementStage ,
10390 state_reader : StateReader ,
10491 plan : EvaluatablePlan ,
105- fetch_full_snapshots : bool = False ,
10692 ) -> ExplainableRestatementStage :
10793 loaded_snapshots = {s .snapshot_id : s for s in stage .all_snapshots .values ()}
10894
@@ -113,21 +99,10 @@ def from_restatement_stage(
11399 loaded_snapshots = loaded_snapshots ,
114100 )
115101
116- if fetch_full_snapshots :
117- # extend loaded_snapshots with the remaining full Snapshot objects from all_restatement_intervals
118- # so that we can generate physical table names for them while explaining what's going on
119- remaining_snapshot_ids_to_load = set (all_restatement_intervals ).difference (
120- loaded_snapshots
121- )
122- loaded_snapshots .update (
123- state_reader .get_snapshots (snapshot_ids = remaining_snapshot_ids_to_load )
124- )
125-
102+ # Group the interval clear requests by snapshot name to make them easier to write to the console
126103 snapshot_intervals_to_clear = defaultdict (list )
127- for snapshot_id , clear_request in all_restatement_intervals .items ():
128- snapshot_intervals_to_clear [clear_request .snapshot .name ].append (
129- (loaded_snapshots .get (snapshot_id ), clear_request )
130- )
104+ for _ , clear_request in all_restatement_intervals .items ():
105+ snapshot_intervals_to_clear [clear_request .snapshot .name ].append (clear_request )
131106
132107 return cls (
133108 snapshot_intervals_to_clear = snapshot_intervals_to_clear ,
@@ -150,13 +125,9 @@ def __init__(
150125 self .environment_naming_info = environment_naming_info
151126 self .dialect = dialect
152127 self .default_catalog = default_catalog
153- self ._verbosity = verbosity
128+ self .verbosity = verbosity
154129 self .console : RichConsole = console or srich .console
155130
156- @property
157- def verbosity (self ) -> Verbosity :
158- return self ._verbosity
159-
160131 def explain (self , stages : t .List [stages .PlanStage ]) -> None :
161132 tree = Tree ("[bold]Explained plan[/bold]" )
162133 for stage in stages :
@@ -246,50 +217,21 @@ def visit_restatement_stage(
246217 snapshot_intervals := stage .snapshot_intervals_to_clear
247218 ):
248219 for name , requests in snapshot_intervals .items ():
249- if not requests :
250- # ensure that there is at least one SnapshotIntervalClearRequest in the list
251- continue
252-
253220 display_name = model_display_name (
254221 name , self .environment_naming_info , self .default_catalog , self .dialect
255222 )
256- _ , clear_request = requests [0 ]
257- interval_start , interval_end = clear_request .interval
223+ interval_start = min (cr .interval [0 ] for cr in requests )
224+ interval_end = max (cr .interval [1 ] for cr in requests )
225+
226+ if not interval_start or not interval_end :
227+ continue
228+
258229 node = tree .add (f"{ display_name } [{ to_ts (interval_start )} - { to_ts (interval_end )} ]" )
259230
260- if not self .verbosity .is_very_verbose :
261- # In normal mode we just indicate which environments are affected at a high level
262- all_environment_names = sorted (
263- set (env_name for (_ , cr ) in requests for env_name in cr .environment_names )
264- )
265- node .add ("in environments: " + ", " .join (all_environment_names ))
266- else :
267- # In "very verbose" mode, we print all the affected physical tables
268-
269- # group by environment for the console output
270- by_environment : t .Dict [t .Optional [str ], t .List [Snapshot ]] = defaultdict (list )
271-
272- for snapshot , clear_request in requests :
273- if not snapshot :
274- # This check is mostly for mypy, snapshots should have been loaded
275- # while creating the ExplainableRestatementStage if the verbosity was set to VERY_VERBOSE
276- continue
277- if clear_request .sorted_environment_names :
278- # snapshot is promoted in these environments
279- for env in clear_request .sorted_environment_names :
280- by_environment [env ].append (snapshot )
281- else :
282- # snapshot is not currently promoted in any environment
283- by_environment [None ].append (snapshot )
284-
285- for env_name , snapshots_to_clear in by_environment .items ():
286- env_name = env_name or "(no env)"
287- for snapshot in snapshots_to_clear :
288- # note: we dont need a DeployabilityIndex and can just hardcode is_deployable=True.
289- # The reason is that non-deployable data can never be restated so we only need to
290- # bother clearing intervals for the deployable version of the table
291- physical_table_name = snapshot .table_name (True )
292- node .add (f"{ env_name } -> { physical_table_name } " )
231+ all_environment_names = sorted (
232+ set (env_name for cr in requests for env_name in cr .environment_names )
233+ )
234+ node .add ("in environments: " + ", " .join (all_environment_names ))
293235
294236 return tree
295237
0 commit comments