@@ -2139,7 +2139,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
21392139
21402140def apply_auto_restatements (
21412141 snapshots : t .Dict [SnapshotId , Snapshot ], execution_time : TimeLike
2142- ) -> t .List [SnapshotIntervals ]:
2142+ ) -> t .Tuple [ t . List [SnapshotIntervals ], t . Dict [ SnapshotId , SnapshotId ] ]:
21432143 """Applies auto restatements to the snapshots.
21442144
21452145 This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2154,6 +2154,8 @@ def apply_auto_restatements(
21542154 A list of SnapshotIntervals with **new** intervals that need to be restated.
21552155 """
21562156 dag = snapshots_to_dag (snapshots .values ())
2157+ snapshots_with_auto_restatements : t .List [SnapshotId ] = []
2158+ auto_restatement_triggers : t .Dict [SnapshotId , SnapshotId ] = {}
21572159 auto_restated_intervals_per_snapshot : t .Dict [SnapshotId , Interval ] = {}
21582160 for s_id in dag :
21592161 if s_id not in snapshots :
@@ -2177,6 +2179,23 @@ def apply_auto_restatements(
21772179 )
21782180 auto_restated_intervals .append (next_auto_restated_interval )
21792181
2182+ # auto-restated snapshot is its own trigger
2183+ snapshots_with_auto_restatements .append (s_id )
2184+ auto_restatement_triggers [s_id ] = s_id
2185+ else :
2186+ for parent_s_id in snapshot .parents :
2187+ # first auto-restated parent is the trigger
2188+ if parent_s_id in snapshots_with_auto_restatements :
2189+ auto_restatement_triggers [s_id ] = parent_s_id
2190+ break
2191+ # if no trigger yet and parent has trigger, inherit their trigger
2192+ # - will be overwritten if a different parent is auto-restated
2193+ if (
2194+ parent_s_id in auto_restatement_triggers
2195+ and s_id not in auto_restatement_triggers
2196+ ):
2197+ auto_restatement_triggers [s_id ] = auto_restatement_triggers [parent_s_id ]
2198+
21802199 if auto_restated_intervals :
21812200 auto_restated_interval_start = sys .maxsize
21822201 auto_restated_interval_end = - sys .maxsize
@@ -2206,20 +2225,22 @@ def apply_auto_restatements(
22062225
22072226 snapshot .apply_pending_restatement_intervals ()
22082227 snapshot .update_next_auto_restatement_ts (execution_time )
2209-
2210- return [
2211- SnapshotIntervals (
2212- name = snapshots [s_id ].name ,
2213- identifier = None ,
2214- version = snapshots [s_id ].version ,
2215- dev_version = None ,
2216- intervals = [],
2217- dev_intervals = [],
2218- pending_restatement_intervals = [interval ],
2219- )
2220- for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2221- if s_id in snapshots
2222- ]
2228+ return (
2229+ [
2230+ SnapshotIntervals (
2231+ name = snapshots [s_id ].name ,
2232+ identifier = None ,
2233+ version = snapshots [s_id ].version ,
2234+ dev_version = None ,
2235+ intervals = [],
2236+ dev_intervals = [],
2237+ pending_restatement_intervals = [interval ],
2238+ )
2239+ for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2240+ if s_id in snapshots
2241+ ],
2242+ auto_restatement_triggers ,
2243+ )
22232244
22242245
22252246def parent_snapshots_by_name (
0 commit comments