3131from sqlmesh .core .snapshot .definition import check_ready_intervals
3232from sqlmesh .core .snapshot .definition import (
3333 Interval ,
34+ SnapshotEvaluationTriggers ,
3435 expand_range ,
3536 parent_snapshots_by_name ,
3637)
@@ -262,6 +263,7 @@ def run(
262263 ignore_cron : bool = False ,
263264 end_bounded : bool = False ,
264265 selected_snapshots : t .Optional [t .Set [str ]] = None ,
266+ selected_snapshots_auto_upstream : t .Optional [t .Set [str ]] = None ,
265267 circuit_breaker : t .Optional [t .Callable [[], bool ]] = None ,
266268 deployability_index : t .Optional [DeployabilityIndex ] = None ,
267269 auto_restatement_enabled : bool = False ,
@@ -278,6 +280,7 @@ def run(
278280 ignore_cron = ignore_cron ,
279281 end_bounded = end_bounded ,
280282 selected_snapshots = selected_snapshots ,
283+ selected_snapshots_auto_upstream = selected_snapshots_auto_upstream ,
281284 circuit_breaker = circuit_breaker ,
282285 deployability_index = deployability_index ,
283286 auto_restatement_enabled = auto_restatement_enabled ,
@@ -532,7 +535,9 @@ def run_node(node: SchedulingUnit) -> None:
532535 evaluation_duration_ms ,
533536 num_audits - num_audits_failed ,
534537 num_audits_failed ,
535- auto_restatement_triggers = auto_restatement_triggers .get (snapshot .snapshot_id ),
538+ snapshot_evaluation_triggers = snapshot_evaluation_triggers .get (
539+ snapshot .snapshot_id
540+ ),
536541 )
537542 elif isinstance (node , CreateNode ):
538543 self .snapshot_evaluator .create_snapshot (
@@ -685,6 +690,7 @@ def _run_or_audit(
685690 ignore_cron : bool = False ,
686691 end_bounded : bool = False ,
687692 selected_snapshots : t .Optional [t .Set [str ]] = None ,
693+ selected_snapshots_auto_upstream : t .Optional [t .Set [str ]] = None ,
688694 circuit_breaker : t .Optional [t .Callable [[], bool ]] = None ,
689695 deployability_index : t .Optional [DeployabilityIndex ] = None ,
690696 auto_restatement_enabled : bool = False ,
@@ -708,6 +714,7 @@ def _run_or_audit(
708714 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
709715 allow_partials, and other attributes that could cause the intervals to exceed the target end date.
710716 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
717+ selected_snapshots_auto_upstream: The set of selected_snapshots that were automatically added because they're upstream of a selected snapshot.
711718 circuit_breaker: An optional handler which checks if the run should be aborted.
712719 deployability_index: Determines snapshots that are deployable in the context of this render.
713720 auto_restatement_enabled: Whether to enable auto restatements.
@@ -763,6 +770,42 @@ def _run_or_audit(
763770 if not merged_intervals :
764771 return CompletionStatus .NOTHING_TO_DO
765772
773+ merged_intervals_snapshots = {
774+ snapshot .snapshot_id : snapshot for snapshot in merged_intervals .keys ()
775+ }
776+ select_snapshot_triggers : t .Dict [SnapshotId , t .List [SnapshotId ]] = {}
777+ if selected_snapshots and selected_snapshots_auto_upstream :
778+ # actually selected snapshots are their own triggers
779+ selected_snapshots_no_auto_upstream = (
780+ selected_snapshots - selected_snapshots_auto_upstream
781+ )
782+ select_snapshot_triggers = {
783+ s_id : [s_id ]
784+ for s_id in [
785+ snapshot_id
786+ for snapshot_id in merged_intervals_snapshots
787+ if snapshot_id .name in selected_snapshots_no_auto_upstream
788+ ]
789+ }
790+
791+ # trace upstream by reversing dag of all snapshots to evaluate
792+ reversed_intervals_dag = snapshots_to_dag (merged_intervals_snapshots .values ()).reversed
793+ for s_id in reversed_intervals_dag :
794+ if s_id not in select_snapshot_triggers :
795+ triggers = []
796+ for parent_s_id in merged_intervals_snapshots [s_id ].parents :
797+ triggers .extend (select_snapshot_triggers [parent_s_id ])
798+ select_snapshot_triggers [s_id ] = list (dict .fromkeys (triggers ))
799+
800+ all_snapshot_triggers : t .Dict [SnapshotId , SnapshotEvaluationTriggers ] = {
801+ s_id : SnapshotEvaluationTriggers (
802+ ignore_cron = ignore_cron ,
803+ auto_restatement_triggers = auto_restatement_triggers .get (s_id , []),
804+ select_snapshot_triggers = select_snapshot_triggers .get (s_id , []),
805+ )
806+ for s_id in merged_intervals_snapshots
807+ if ignore_cron or s_id in auto_restatement_triggers or s_id in select_snapshot_triggers
808+ }
766809 errors , _ = self .run_merged_intervals (
767810 merged_intervals = merged_intervals ,
768811 deployability_index = deployability_index ,
@@ -773,6 +816,7 @@ def _run_or_audit(
773816 end = end ,
774817 run_environment_statements = run_environment_statements ,
775818 audit_only = audit_only ,
819+ restatements = remove_intervals ,
776820 auto_restatement_triggers = auto_restatement_triggers ,
777821 )
778822
0 commit comments