Skip to content

Commit ad5466b

Browse files
committed
Widen parent start date override if it ends up being later than a child start date override
1 parent 29c0ed5 commit ad5466b

2 files changed

Lines changed: 174 additions & 0 deletions

File tree

sqlmesh/core/plan/builder.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ def build(self) -> Plan:
282282
self._check_destructive_changes(directly_modified)
283283
self._categorize_snapshots(dag, indirectly_modified)
284284
self._adjust_new_snapshot_intervals()
285+
self._adjust_start_overrides(dag)
285286

286287
deployability_index = (
287288
DeployabilityIndex.create(
@@ -524,6 +525,42 @@ def _adjust_new_snapshot_intervals(self) -> None:
524525
if new.is_forward_only:
525526
new.dev_intervals = new.intervals.copy()
526527

528+
def _adjust_start_overrides(self, dag: DAG[SnapshotId]) -> None:
529+
if not self._start_override_per_model:
530+
return
531+
532+
start_override_by_snapshot_id = {
533+
self._context_diff.snapshots_by_name[name].snapshot_id: start_date
534+
for name, start_date in self._start_override_per_model.items()
535+
}
536+
537+
for current_snapshot_id in dag:
538+
# we only care about adjusting the start date for incremental models
539+
current_snapshot = self._context_diff.snapshots[current_snapshot_id]
540+
if not current_snapshot.is_incremental:
541+
continue
542+
543+
earliest_downstream_start_date_override = min(
544+
(
545+
start_override_by_snapshot_id[downstream_sid]
546+
for downstream_sid in dag.downstream(current_snapshot_id)
547+
if downstream_sid in start_override_by_snapshot_id
548+
),
549+
default=None,
550+
)
551+
552+
if earliest_downstream_start_date_override:
553+
current_start_date_override = start_override_by_snapshot_id.get(current_snapshot_id)
554+
555+
# if any of our downstream snapshots have a start date override earlier than us, we need to widen ourselves to include it
556+
# otherwise, the downstream snapshots will only get a subset of the data they need
557+
if not current_start_date_override or (
558+
earliest_downstream_start_date_override < current_start_date_override
559+
):
560+
self._start_override_per_model[current_snapshot.name] = (
561+
earliest_downstream_start_date_override
562+
)
563+
527564
def _check_destructive_changes(self, directly_modified: t.Set[SnapshotId]) -> None:
528565
for s_id in sorted(directly_modified):
529566
snapshot = self._context_diff.snapshots[s_id]

tests/core/test_context.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2477,3 +2477,140 @@ def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, da
24772477
) == [
24782478
(to_datetime("2020-01-18 00:00:00"), to_datetime("2020-01-18 23:59:59.999999")),
24792479
]
2480+
2481+
2482+
def test_plan_min_intervals_adjusted_for_downstream(tmp_path: Path):
2483+
"""
2484+
Scenario:
2485+
A(hourly) <- B(daily) <- C(weekly)
2486+
D(hourly)
2487+
2488+
We need to ensure that :min_intervals covers at least :min_intervals of all downstream models for the dag to be valid
2489+
In this scenario, if min_intervals=1:
2490+
- A would need to cover at least 168 hours (7 days * 24 hours) because its downstream model C is weekly
2491+
- B would need to cover at least 7 days because its downstream model C is weekly
2492+
- C would need to cover at least 1 week because min_intervals: 1
2493+
- D is unrelated to A, B and C so would only need to cover 1 hour to satisfy min_intervals: 1
2494+
"""
2495+
2496+
init_example_project(tmp_path, engine_type="duckdb", dialect="duckdb")
2497+
2498+
context = Context(
2499+
paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
2500+
)
2501+
2502+
current_time = to_datetime("2020-02-01 00:00:01")
2503+
2504+
# initial state of example project
2505+
context.plan(auto_apply=True, execution_time=current_time)
2506+
2507+
(tmp_path / "models" / "hourly_model.sql").write_text("""
2508+
MODEL (
2509+
name sqlmesh_example.hourly_model,
2510+
kind INCREMENTAL_BY_TIME_RANGE (
2511+
time_column start_dt,
2512+
batch_size 1
2513+
),
2514+
start '2020-01-01',
2515+
cron '@hourly'
2516+
);
2517+
2518+
select @start_dt as start_dt, @end_dt as end_dt;
2519+
""")
2520+
2521+
(tmp_path / "models" / "unrelated_hourly_model.sql").write_text("""
2522+
MODEL (
2523+
name sqlmesh_example.unrelated_hourly_model,
2524+
kind INCREMENTAL_BY_TIME_RANGE (
2525+
time_column start_dt
2526+
),
2527+
start '2020-01-01',
2528+
cron '@hourly'
2529+
);
2530+
2531+
select @start_dt as start_dt, @end_dt as end_dt;
2532+
""")
2533+
2534+
(tmp_path / "models" / "daily_model.sql").write_text("""
2535+
MODEL (
2536+
name sqlmesh_example.daily_model,
2537+
kind INCREMENTAL_BY_TIME_RANGE (
2538+
time_column start_dt
2539+
),
2540+
start '2020-01-01',
2541+
cron '@daily'
2542+
);
2543+
2544+
select start_dt, end_dt from sqlmesh_example.hourly_model where start_dt between @start_dt and @end_dt;
2545+
""")
2546+
2547+
(tmp_path / "models" / "weekly_model.sql").write_text("""
2548+
MODEL (
2549+
name sqlmesh_example.weekly_model,
2550+
kind INCREMENTAL_BY_TIME_RANGE (
2551+
time_column start_dt
2552+
),
2553+
start '2020-01-01',
2554+
cron '@weekly'
2555+
);
2556+
2557+
select start_dt, end_dt from sqlmesh_example.daily_model where start_dt between @start_dt and @end_dt;
2558+
""")
2559+
2560+
context.load()
2561+
2562+
# create a dev env for "1 day ago" with min_intervals=1
2563+
# this should force a weeks worth of intervals for every model
2564+
plan = context.plan(
2565+
environment="pr_env",
2566+
start="1 day ago",
2567+
execution_time=current_time,
2568+
min_intervals=1,
2569+
)
2570+
2571+
def _get_missing_intervals(name: str) -> t.List[t.Tuple[datetime, datetime]]:
2572+
snapshot_id = context.get_snapshot(name, raise_if_missing=True).snapshot_id
2573+
snapshot_intervals = next(
2574+
si for si in plan.missing_intervals if si.snapshot_id == snapshot_id
2575+
)
2576+
return [(to_datetime(s), to_datetime(e)) for s, e in snapshot_intervals.merged_intervals]
2577+
2578+
# We only operate on completed intervals, so given the current_time this is the range of the last completed week
2579+
_get_missing_intervals("sqlmesh_example.weekly_model") == [
2580+
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-26 00:00:00"))
2581+
]
2582+
2583+
# The daily model needs to cover the week, so it gets its start date moved back to line up
2584+
_get_missing_intervals("sqlmesh_example.daily_model") == [
2585+
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2586+
]
2587+
2588+
# The hourly model needs to cover both the daily model and the weekly model, so it also gets its start date moved back to line up with the weekly model
2589+
assert _get_missing_intervals("sqlmesh_example.hourly_model") == [
2590+
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2591+
]
2592+
2593+
# The unrelated model has no upstream constraints, so its start date doesnt get moved to line up with the weekly model
2594+
# However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1
2595+
_get_missing_intervals("sqlmesh_example.unrelated_hourly_model") == [
2596+
(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2597+
]
2598+
2599+
# Check that actually running the plan produces the correct result, since missing intervals are re-calculated in the evaluator
2600+
context.apply(plan)
2601+
2602+
assert context.engine_adapter.fetchall(
2603+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.weekly_model"
2604+
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-25 23:59:59.999999"))]
2605+
2606+
assert context.engine_adapter.fetchall(
2607+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.daily_model"
2608+
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2609+
2610+
assert context.engine_adapter.fetchall(
2611+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.hourly_model"
2612+
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2613+
2614+
assert context.engine_adapter.fetchall(
2615+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.unrelated_hourly_model"
2616+
) == [(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]

0 commit comments

Comments
 (0)