Skip to content

Commit 2f27609

Browse files
committed
Infer cron ready from auto-restatement intervals
1 parent dd6a35f commit 2f27609

4 files changed

Lines changed: 20 additions & 23 deletions

File tree

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def _missing_intervals(
553553
snapshots_by_name: t.Dict[str, Snapshot],
554554
deployability_index: DeployabilityIndex,
555555
) -> SnapshotToIntervals:
556-
missing_intervals, _ = merged_missing_intervals(
556+
missing_intervals = merged_missing_intervals(
557557
snapshots=snapshots_by_name.values(),
558558
start=plan.start,
559559
end=plan.end,

sqlmesh/core/scheduler.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@
2828
snapshots_to_dag,
2929
Intervals,
3030
)
31-
from sqlmesh.core.snapshot.definition import check_ready_intervals
3231
from sqlmesh.core.snapshot.definition import (
3332
Interval,
3433
SnapshotEvaluationTriggers,
34+
SnapshotIntervals,
35+
check_ready_intervals,
3536
expand_range,
3637
parent_snapshots_by_name,
3738
)
@@ -147,7 +148,7 @@ def merged_missing_intervals(
147148
ignore_cron: bool = False,
148149
end_bounded: bool = False,
149150
selected_snapshots: t.Optional[t.Set[str]] = None,
150-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
151+
) -> SnapshotToIntervals:
151152
"""Find the largest contiguous date interval parameters based only on what is missing.
152153
153154
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -169,9 +170,9 @@ def merged_missing_intervals(
169170
selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
170171
171172
Returns:
172-
A tuple containing a dict containing all snapshots needing to be run with their associated interval params and a list of snapshots that are ready to run based on their naive cron schedule (ignoring plan/run context and other attributes).
173+
A dict containing all snapshots needing to be run with their associated interval params.
173174
"""
174-
snapshots_to_intervals, snapshots_naive_cron_ready = merged_missing_intervals(
175+
snapshots_to_intervals = merged_missing_intervals(
175176
snapshots=self.snapshot_per_version.values(),
176177
start=start,
177178
end=end,
@@ -189,7 +190,7 @@ def merged_missing_intervals(
189190
snapshots_to_intervals = {
190191
s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots
191192
}
192-
return snapshots_to_intervals, snapshots_naive_cron_ready
193+
return snapshots_to_intervals
193194

194195
def evaluate(
195196
self,
@@ -748,6 +749,7 @@ def _run_or_audit(
748749
for s_id, interval in (remove_intervals or {}).items():
749750
self.snapshots[s_id].remove_interval(interval)
750751

752+
auto_restated_intervals: t.List[SnapshotIntervals] = []
751753
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
752754
if auto_restatement_enabled:
753755
auto_restated_intervals, auto_restatement_triggers = apply_auto_restatements(
@@ -757,8 +759,9 @@ def _run_or_audit(
757759
self.state_sync.update_auto_restatements(
758760
{s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()}
759761
)
762+
auto_restated_snapshots = {snapshot.snapshot_id for snapshot in auto_restated_intervals}
760763

761-
merged_intervals, snapshots_naive_cron_ready = self.merged_missing_intervals(
764+
merged_intervals = self.merged_missing_intervals(
762765
start,
763766
end,
764767
execution_time,
@@ -801,7 +804,7 @@ def _run_or_audit(
801804
all_snapshot_triggers: t.Dict[SnapshotId, SnapshotEvaluationTriggers] = {
802805
s_id: SnapshotEvaluationTriggers(
803806
ignore_cron_flag=ignore_cron,
804-
cron_ready=s_id in snapshots_naive_cron_ready,
807+
cron_ready=s_id not in auto_restated_snapshots,
805808
auto_restatement_triggers=auto_restatement_triggers.get(s_id, []),
806809
select_snapshot_triggers=select_snapshot_triggers.get(s_id, []),
807810
)
@@ -969,7 +972,7 @@ def merged_missing_intervals(
969972
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
970973
ignore_cron: bool = False,
971974
end_bounded: bool = False,
972-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
975+
) -> SnapshotToIntervals:
973976
"""Find the largest contiguous date interval parameters based only on what is missing.
974977
975978
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -1019,7 +1022,7 @@ def compute_interval_params(
10191022
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
10201023
ignore_cron: bool = False,
10211024
end_bounded: bool = False,
1022-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
1025+
) -> SnapshotToIntervals:
10231026
"""Find the largest contiguous date interval parameters based only on what is missing.
10241027
10251028
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -1041,7 +1044,7 @@ def compute_interval_params(
10411044
allow_partials, and other attributes that could cause the intervals to exceed the target end date.
10421045
10431046
Returns:
1044-
A tuple containing a dict containing all snapshots needing to be run with their associated interval params and a list of snapshots that are ready to run based on their naive cron schedule (ignoring plan/run context and other attributes).
1047+
A dict containing all snapshots needing to be run with their associated interval params.
10451048
"""
10461049
snapshot_merged_intervals = {}
10471050

@@ -1069,11 +1072,7 @@ def compute_interval_params(
10691072
contiguous_batch.append((next_batch[0][0], next_batch[-1][-1]))
10701073
snapshot_merged_intervals[snapshot] = contiguous_batch
10711074

1072-
snapshots_naive_cron_ready = [
1073-
snap.snapshot_id for snap in missing_intervals(snapshots, execution_time=execution_time)
1074-
]
1075-
1076-
return snapshot_merged_intervals, snapshots_naive_cron_ready
1075+
return snapshot_merged_intervals
10771076

10781077

10791078
def interval_diff(

tests/core/test_scheduler.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ def test_interval_params(scheduler: Scheduler, sushi_context_fixed_date: Context
5959
start_ds = "2022-01-01"
6060
end_ds = "2022-02-05"
6161

62-
interval_params, _ = compute_interval_params(
63-
[orders, waiter_revenue], start=start_ds, end=end_ds
64-
)
62+
interval_params = compute_interval_params([orders, waiter_revenue], start=start_ds, end=end_ds)
6563
assert interval_params == {
6664
orders: [
6765
(to_timestamp(start_ds), to_timestamp("2022-02-06")),
@@ -82,7 +80,7 @@ def _get_batched_missing_intervals(
8280
end: TimeLike,
8381
execution_time: t.Optional[TimeLike] = None,
8482
) -> SnapshotToIntervals:
85-
merged_intervals, _ = scheduler.merged_missing_intervals(start, end, execution_time)
83+
merged_intervals = scheduler.merged_missing_intervals(start, end, execution_time)
8684
return scheduler.batch_intervals(merged_intervals, mocker.Mock(), mocker.Mock())
8785

8886
return _get_batched_missing_intervals
@@ -94,7 +92,7 @@ def test_interval_params_nonconsecutive(scheduler: Scheduler, orders: Snapshot):
9492

9593
orders.add_interval("2022-01-10", "2022-01-15")
9694

97-
interval_params, _ = compute_interval_params([orders], start=start_ds, end=end_ds)
95+
interval_params = compute_interval_params([orders], start=start_ds, end=end_ds)
9896
assert interval_params == {
9997
orders: [
10098
(to_timestamp(start_ds), to_timestamp("2022-01-10")),
@@ -111,7 +109,7 @@ def test_interval_params_missing(scheduler: Scheduler, sushi_context_fixed_date:
111109

112110
start_ds = "2022-01-01"
113111
end_ds = "2022-03-01"
114-
interval_params, _ = compute_interval_params(
112+
interval_params = compute_interval_params(
115113
sushi_context_fixed_date.snapshots.values(), start=start_ds, end=end_ds
116114
)
117115
assert interval_params[waiters] == [

web/server/api/endpoints/plan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def _get_plan_changes(context: Context, plan: Plan) -> models.PlanChanges:
132132

133133
def _get_plan_backfills(context: Context, plan: Plan) -> t.Dict[str, t.Any]:
134134
"""Get plan backfills"""
135-
merged_intervals, _ = context.scheduler().merged_missing_intervals()
135+
merged_intervals = context.scheduler().merged_missing_intervals()
136136
batches = context.scheduler().batch_intervals(merged_intervals, None, EnvironmentNamingInfo())
137137
tasks = {snapshot.name: len(intervals) for snapshot, intervals in batches.items()}
138138
snapshots = plan.context_diff.snapshots

0 commit comments

Comments
 (0)