Skip to content

Commit 50ca148

Browse files
committed
Report auto-restatement triggers only
1 parent 2f27609 commit 50ca148

9 files changed

Lines changed: 89 additions & 140 deletions

File tree

sqlmesh/core/console.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,7 @@
3737
SnapshotId,
3838
SnapshotInfoLike,
3939
)
40-
from sqlmesh.core.snapshot.definition import (
41-
Interval,
42-
Intervals,
43-
SnapshotTableInfo,
44-
SnapshotEvaluationTriggers,
45-
)
40+
from sqlmesh.core.snapshot.definition import Interval, Intervals, SnapshotTableInfo
4641
from sqlmesh.core.test import ModelTest
4742
from sqlmesh.utils import rich as srich
4843
from sqlmesh.utils import Verbosity
@@ -433,7 +428,7 @@ def update_snapshot_evaluation_progress(
433428
num_audits_passed: int,
434429
num_audits_failed: int,
435430
audit_only: bool = False,
436-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
431+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
437432
) -> None:
438433
"""Updates the snapshot evaluation progress."""
439434

@@ -581,7 +576,7 @@ def update_snapshot_evaluation_progress(
581576
num_audits_passed: int,
582577
num_audits_failed: int,
583578
audit_only: bool = False,
584-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
579+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
585580
) -> None:
586581
pass
587582

@@ -1063,7 +1058,7 @@ def update_snapshot_evaluation_progress(
10631058
num_audits_passed: int,
10641059
num_audits_failed: int,
10651060
audit_only: bool = False,
1066-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
1061+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10671062
) -> None:
10681063
"""Update the snapshot evaluation progress."""
10691064
if (
@@ -3647,7 +3642,7 @@ def update_snapshot_evaluation_progress(
36473642
num_audits_passed: int,
36483643
num_audits_failed: int,
36493644
audit_only: bool = False,
3650-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
3645+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36513646
) -> None:
36523647
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36533648

@@ -3817,22 +3812,15 @@ def update_snapshot_evaluation_progress(
38173812
num_audits_passed: int,
38183813
num_audits_failed: int,
38193814
audit_only: bool = False,
3820-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
3815+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38213816
) -> None:
38223817
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38233818

3824-
if snapshot_evaluation_triggers:
3825-
if snapshot_evaluation_triggers.ignore_cron_flag is not None:
3826-
message += f" | ignore_cron_flag={snapshot_evaluation_triggers.ignore_cron_flag}"
3827-
if snapshot_evaluation_triggers.cron_ready is not None:
3828-
message += f" | cron_ready={snapshot_evaluation_triggers.cron_ready}"
3829-
if snapshot_evaluation_triggers.auto_restatement_triggers:
3830-
message += f" | auto_restatement_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.auto_restatement_triggers)}"
3831-
if snapshot_evaluation_triggers.select_snapshot_triggers:
3832-
message += f" | select_snapshot_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.select_snapshot_triggers)}"
3819+
if auto_restatement_triggers:
3820+
message += f" | Auto-restatement triggers {', '.join(trigger.name for trigger in auto_restatement_triggers)}"
38333821

38343822
if audit_only:
3835-
message = f"Audited {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3823+
message = f"Audited {snapshot.name} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38363824

38373825
self._write(message)
38383826

sqlmesh/core/context.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,9 +2307,11 @@ def check_intervals(
23072307
}
23082308

23092309
if select_models:
2310-
selected, _ = self._select_models_for_run(select_models, True, snapshots.values())
2310+
selected: t.Collection[str] = self._select_models_for_run(
2311+
select_models, True, snapshots.values()
2312+
)
23112313
else:
2312-
selected = set(snapshots.keys())
2314+
selected = snapshots.keys()
23132315

23142316
results = {}
23152317
execution_context = self.execution_context(snapshots=snapshots)
@@ -2459,9 +2461,8 @@ def _run(
24592461
scheduler = self.scheduler(environment=environment)
24602462
snapshots = scheduler.snapshots
24612463

2462-
select_models_auto_upstream = None
24632464
if select_models is not None:
2464-
select_models, select_models_auto_upstream = self._select_models_for_run(
2465+
select_models = self._select_models_for_run(
24652466
select_models, no_auto_upstream, snapshots.values()
24662467
)
24672468

@@ -2473,7 +2474,6 @@ def _run(
24732474
ignore_cron=ignore_cron,
24742475
circuit_breaker=circuit_breaker,
24752476
selected_snapshots=select_models,
2476-
selected_snapshots_auto_upstream=select_models_auto_upstream,
24772477
auto_restatement_enabled=environment.lower() == c.PROD,
24782478
run_environment_statements=True,
24792479
)
@@ -2889,7 +2889,7 @@ def _select_models_for_run(
28892889
select_models: t.Collection[str],
28902890
no_auto_upstream: bool,
28912891
snapshots: t.Collection[Snapshot],
2892-
) -> t.Tuple[t.Set[str], t.Set[str]]:
2892+
) -> t.Set[str]:
28932893
models: UniqueKeyDict[str, Model] = UniqueKeyDict(
28942894
"models", **{s.name: s.model for s in snapshots if s.is_model}
28952895
)
@@ -2898,10 +2898,9 @@ def _select_models_for_run(
28982898
dag.add(fqn, model.depends_on)
28992899
model_selector = self._new_selector(models=models, dag=dag)
29002900
result = set(model_selector.expand_model_selections(select_models))
2901-
if no_auto_upstream:
2902-
return result, set()
2903-
result_with_upstream = set(dag.subdag(*result))
2904-
return result_with_upstream, result_with_upstream - result
2901+
if not no_auto_upstream:
2902+
result = set(dag.subdag(*result))
2903+
return result
29052904

29062905
@cached_property
29072906
def _project_type(self) -> str:

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def _missing_intervals(
553553
snapshots_by_name: t.Dict[str, Snapshot],
554554
deployability_index: DeployabilityIndex,
555555
) -> SnapshotToIntervals:
556-
missing_intervals = merged_missing_intervals(
556+
return merged_missing_intervals(
557557
snapshots=snapshots_by_name.values(),
558558
start=plan.start,
559559
end=plan.end,
@@ -568,7 +568,6 @@ def _missing_intervals(
568568
start_override_per_model=plan.start_override_per_model,
569569
end_override_per_model=plan.end_override_per_model,
570570
)
571-
return missing_intervals
572571

573572
def _get_audit_only_snapshots(
574573
self, new_snapshots: t.Dict[SnapshotId, Snapshot]

sqlmesh/core/scheduler.py

Lines changed: 5 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@
2828
snapshots_to_dag,
2929
Intervals,
3030
)
31+
from sqlmesh.core.snapshot.definition import check_ready_intervals
3132
from sqlmesh.core.snapshot.definition import (
3233
Interval,
33-
SnapshotEvaluationTriggers,
3434
SnapshotIntervals,
35-
check_ready_intervals,
3635
expand_range,
3736
parent_snapshots_by_name,
3837
)
@@ -168,9 +167,6 @@ def merged_missing_intervals(
168167
end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
169168
allow_partials, and other attributes that could cause the intervals to exceed the target end date.
170169
selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
171-
172-
Returns:
173-
A dict containing all snapshots needing to be run with their associated interval params.
174170
"""
175171
snapshots_to_intervals = merged_missing_intervals(
176172
snapshots=self.snapshot_per_version.values(),
@@ -267,7 +263,6 @@ def run(
267263
ignore_cron: bool = False,
268264
end_bounded: bool = False,
269265
selected_snapshots: t.Optional[t.Set[str]] = None,
270-
selected_snapshots_auto_upstream: t.Optional[t.Set[str]] = None,
271266
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
272267
deployability_index: t.Optional[DeployabilityIndex] = None,
273268
auto_restatement_enabled: bool = False,
@@ -284,7 +279,6 @@ def run(
284279
ignore_cron=ignore_cron,
285280
end_bounded=end_bounded,
286281
selected_snapshots=selected_snapshots,
287-
selected_snapshots_auto_upstream=selected_snapshots_auto_upstream,
288282
circuit_breaker=circuit_breaker,
289283
deployability_index=deployability_index,
290284
auto_restatement_enabled=auto_restatement_enabled,
@@ -422,6 +416,7 @@ def run_merged_intervals(
422416
selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
423417
run_environment_statements: bool = False,
424418
audit_only: bool = False,
419+
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
425420
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
426421
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
427422
"""Runs precomputed batches of missing intervals.
@@ -539,9 +534,7 @@ def run_node(node: SchedulingUnit) -> None:
539534
evaluation_duration_ms,
540535
num_audits - num_audits_failed,
541536
num_audits_failed,
542-
snapshot_evaluation_triggers=snapshot_evaluation_triggers.get(
543-
snapshot.snapshot_id
544-
),
537+
auto_restatement_triggers=auto_restatement_triggers.get(snapshot.snapshot_id),
545538
)
546539
elif isinstance(node, CreateNode):
547540
self.snapshot_evaluator.create_snapshot(
@@ -694,7 +687,6 @@ def _run_or_audit(
694687
ignore_cron: bool = False,
695688
end_bounded: bool = False,
696689
selected_snapshots: t.Optional[t.Set[str]] = None,
697-
selected_snapshots_auto_upstream: t.Optional[t.Set[str]] = None,
698690
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
699691
deployability_index: t.Optional[DeployabilityIndex] = None,
700692
auto_restatement_enabled: bool = False,
@@ -718,7 +710,6 @@ def _run_or_audit(
718710
end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
719711
allow_partials, and other attributes that could cause the intervals to exceed the target end date.
720712
selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
721-
selected_snapshots_auto_upstream: The set of selected_snapshots that were automatically added because they're upstream of a selected snapshot.
722713
circuit_breaker: An optional handler which checks if the run should be aborted.
723714
deployability_index: Determines snapshots that are deployable in the context of this render.
724715
auto_restatement_enabled: Whether to enable auto restatements.
@@ -777,38 +768,9 @@ def _run_or_audit(
777768
return CompletionStatus.NOTHING_TO_DO
778769

779770
merged_intervals_snapshots = {snapshot.snapshot_id for snapshot in merged_intervals}
780-
select_snapshot_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
781-
if selected_snapshots and selected_snapshots_auto_upstream:
782-
# actually selected snapshots are their own triggers
783-
selected_snapshots_no_auto_upstream = (
784-
selected_snapshots - selected_snapshots_auto_upstream
785-
)
786-
select_snapshot_triggers = {
787-
s_id: [s_id]
788-
for s_id in [
789-
snapshot_id
790-
for snapshot_id in merged_intervals_snapshots
791-
if snapshot_id.name in selected_snapshots_no_auto_upstream
792-
]
793-
}
794771

795-
# trace upstream by walking downstream on reversed dag
796-
reversed_dag = snapshots_to_dag(self.snapshots.values()).reversed
797-
for s_id in reversed_dag:
798-
if s_id in merged_intervals_snapshots:
799-
triggers = select_snapshot_triggers.get(s_id, [])
800-
for parent_s_id in reversed_dag.graph.get(s_id, set()):
801-
triggers.extend(select_snapshot_triggers.get(parent_s_id, []))
802-
select_snapshot_triggers[s_id] = list(dict.fromkeys(triggers))
803-
804-
all_snapshot_triggers: t.Dict[SnapshotId, SnapshotEvaluationTriggers] = {
805-
s_id: SnapshotEvaluationTriggers(
806-
ignore_cron_flag=ignore_cron,
807-
cron_ready=s_id not in auto_restated_snapshots,
808-
auto_restatement_triggers=auto_restatement_triggers.get(s_id, []),
809-
select_snapshot_triggers=select_snapshot_triggers.get(s_id, []),
810-
)
811-
for s_id in merged_intervals_snapshots
772+
auto_restatement_triggers_dict: t.Dict[SnapshotId, t.List[SnapshotId]] = {
773+
s_id: auto_restatement_triggers.get(s_id, []) for s_id in merged_intervals_snapshots
812774
}
813775

814776
errors, _ = self.run_merged_intervals(

sqlmesh/core/snapshot/definition.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from sqlmesh.core.model import Model, ModelKindMixin, ModelKindName, ViewKind, CustomKind
2222
from sqlmesh.core.model.definition import _Model
2323
from sqlmesh.core.node import IntervalUnit, NodeType
24-
from sqlmesh.utils import sanitize_name
24+
from sqlmesh.utils import sanitize_name, unique
2525
from sqlmesh.utils.dag import DAG
2626
from sqlmesh.utils.date import (
2727
TimeLike,
@@ -327,13 +327,6 @@ def table_name_for_environment(
327327
return table
328328

329329

330-
class SnapshotEvaluationTriggers(PydanticModel):
331-
ignore_cron_flag: t.Optional[bool] = None
332-
cron_ready: t.Optional[bool] = None
333-
auto_restatement_triggers: t.List[SnapshotId] = []
334-
select_snapshot_triggers: t.List[SnapshotId] = []
335-
336-
337330
class SnapshotInfoMixin(ModelKindMixin):
338331
name: str
339332
dev_version_: t.Optional[str]
@@ -2229,14 +2222,15 @@ def apply_auto_restatements(
22292222

22302223
# auto-restated snapshot is its own trigger
22312224
upstream_triggers = [s_id]
2225+
else:
2226+
# inherit each parent's auto-restatement triggers (if any)
2227+
for parent_s_id in snapshot.parents:
2228+
if parent_s_id in auto_restatement_triggers:
2229+
upstream_triggers.extend(auto_restatement_triggers[parent_s_id])
22322230

2233-
for parent_s_id in snapshot.parents:
2234-
if parent_s_id in auto_restatement_triggers:
2235-
upstream_triggers.extend(auto_restatement_triggers[parent_s_id])
2236-
2237-
# remove duplicate triggers
2231+
# remove duplicate triggers, retaining order and keeping first seen of duplicates
22382232
if upstream_triggers:
2239-
auto_restatement_triggers[s_id] = list(dict.fromkeys(upstream_triggers))
2233+
auto_restatement_triggers[s_id] = unique(upstream_triggers)
22402234

22412235
if auto_restated_intervals:
22422236
auto_restated_interval_start = sys.maxsize

0 commit comments

Comments
 (0)