Skip to content

Commit ece068b

Browse files
authored
Fix: Selectively widen restatement intervals if they affect a downstream model that cannot be partially restated (#3902)
1 parent df83c16 commit ece068b

2 files changed

Lines changed: 184 additions & 5 deletions

File tree

sqlmesh/core/plan/evaluator.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,9 @@ def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapsho
438438
# Without this rule, its possible that promoting a dev table to prod will introduce old data to prod
439439
snapshot_intervals_to_restate.update(
440440
self._restatement_intervals_across_all_environments(
441-
plan.restatements, plan.disabled_restatement_models
441+
prod_restatements=plan.restatements,
442+
disable_restatement_models=plan.disabled_restatement_models,
443+
loaded_snapshots={s.snapshot_id: s for s in snapshots_by_name.values()},
442444
)
443445
)
444446

@@ -448,7 +450,10 @@ def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapsho
448450
)
449451

450452
def _restatement_intervals_across_all_environments(
451-
self, prod_restatements: t.Dict[str, Interval], disable_restatement_models: t.Set[str]
453+
self,
454+
prod_restatements: t.Dict[str, Interval],
455+
disable_restatement_models: t.Set[str],
456+
loaded_snapshots: t.Dict[SnapshotId, Snapshot],
452457
) -> t.Set[t.Tuple[SnapshotTableInfo, Interval]]:
453458
"""
454459
Given a map of snapshot names + intervals to restate in prod:
@@ -462,7 +467,7 @@ def _restatement_intervals_across_all_environments(
462467
if not prod_restatements:
463468
return set()
464469

465-
snapshots_to_restate: t.Set[t.Tuple[SnapshotTableInfo, Interval]] = set()
470+
snapshots_to_restate: t.Dict[SnapshotId, t.Tuple[SnapshotTableInfo, Interval]] = {}
466471

467472
for env in self.state_sync.get_environments():
468473
keyed_snapshots = {s.name: s.table_info for s in env.snapshots}
@@ -480,10 +485,47 @@ def _restatement_intervals_across_all_environments(
480485
if x not in disable_restatement_models
481486
]
482487
snapshots_to_restate.update(
483-
{(keyed_snapshots[a], intervals) for a in affected_snapshot_names}
488+
{
489+
keyed_snapshots[a].snapshot_id: (keyed_snapshots[a], intervals)
490+
for a in affected_snapshot_names
491+
}
492+
)
493+
494+
# for any affected full_history_restatement_only snapshots, we need to widen the intervals being restated to
495+
# include the whole time range for that snapshot. This requires a call to state to load the full snapshot record,
496+
# so we only do it if necessary
497+
if full_history_restatement_snapshot_ids := [
498+
# FIXME: full_history_restatement_only is just one indicator that the snapshot can only be fully refreshed, the other one is Model.depends_on_self
499+
# however, to figure out depends_on_self, we have to render all the model queries which, alongside having to fetch full snapshots from state,
500+
# is problematic in secure environments that are deliberately isolated from arbitrary user code (since rendering a query may require user macros to be present)
501+
# So for now, these are not considered
502+
s_id
503+
for s_id, s in snapshots_to_restate.items()
504+
if s[0].full_history_restatement_only
505+
]:
506+
# only load full snapshot records that we havent already loaded
507+
additional_snapshots = self.state_sync.get_snapshots(
508+
[
509+
s.snapshot_id
510+
for s in full_history_restatement_snapshot_ids
511+
if s.snapshot_id not in loaded_snapshots
512+
]
513+
)
514+
515+
all_snapshots = loaded_snapshots | additional_snapshots
516+
517+
for full_snapshot_id in full_history_restatement_snapshot_ids:
518+
full_snapshot = all_snapshots[full_snapshot_id]
519+
_, original_intervals = snapshots_to_restate[full_snapshot_id]
520+
original_start, original_end = original_intervals
521+
522+
# get_removal_interval() widens intervals if necessary
523+
new_intervals = full_snapshot.get_removal_interval(
524+
start=original_start, end=original_end
484525
)
526+
snapshots_to_restate[full_snapshot_id] = (full_snapshot.table_info, new_intervals)
485527

486-
return snapshots_to_restate
528+
return set(snapshots_to_restate.values())
487529

488530

489531
class BaseAirflowPlanEvaluator(PlanEvaluator):

tests/core/test_integration.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3027,6 +3027,143 @@ def _dates_in_table(table_name: str) -> t.List[str]:
30273027
]
30283028

30293029

3030+
def test_prod_restatement_plan_causes_dev_intervals_to_be_widened_on_full_restatement_only_model(
3031+
tmp_path,
3032+
):
3033+
"""
3034+
Scenario:
3035+
I have am INCREMENTAL_BY_TIME_RANGE model A[daily] in prod
3036+
I create dev and add a INCREMENTAL_BY_UNIQUE_KEY model B (which supports full restatement only)
3037+
I prod, I restate one day of A which should cause intervals in dev to be cleared (but not processed)
3038+
In dev, I run a plan
3039+
3040+
Outcome:
3041+
In the dev plan, the entire model for B should be rebuilt because it does not support partial restatement
3042+
"""
3043+
3044+
model_a = """
3045+
MODEL (
3046+
name test.a,
3047+
kind INCREMENTAL_BY_TIME_RANGE (
3048+
time_column "ts"
3049+
),
3050+
start '2024-01-01 00:00:00',
3051+
cron '@daily'
3052+
);
3053+
3054+
select account_id, ts from test.external_table where ts between @start_ts and @end_ts;
3055+
"""
3056+
3057+
model_b = """
3058+
MODEL (
3059+
name test.b,
3060+
kind INCREMENTAL_BY_UNIQUE_KEY (
3061+
unique_key (account_id, ts)
3062+
),
3063+
cron '@daily'
3064+
);
3065+
3066+
select account_id, ts from test.a where ts between @start_ts and @end_ts;
3067+
"""
3068+
3069+
models_dir = tmp_path / "models"
3070+
models_dir.mkdir()
3071+
3072+
with open(models_dir / "a.sql", "w") as f:
3073+
f.write(model_a)
3074+
3075+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
3076+
ctx = Context(paths=[tmp_path], config=config)
3077+
3078+
engine_adapter = ctx.engine_adapter
3079+
engine_adapter.create_schema("test")
3080+
3081+
# source data
3082+
df = pd.DataFrame(
3083+
{
3084+
"account_id": [1001, 1002, 1003, 1004],
3085+
"ts": [
3086+
"2024-01-01 00:30:00",
3087+
"2024-01-02 01:30:00",
3088+
"2024-01-03 02:30:00",
3089+
"2024-01-04 00:30:00",
3090+
],
3091+
}
3092+
)
3093+
columns_to_types = {
3094+
"account_id": exp.DataType.build("int"),
3095+
"ts": exp.DataType.build("timestamp"),
3096+
}
3097+
external_table = exp.table_(table="external_table", db="test", quoted=True)
3098+
engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types)
3099+
engine_adapter.insert_append(
3100+
table_name=external_table, query_or_df=df, columns_to_types=columns_to_types
3101+
)
3102+
3103+
# plan + apply A[daily] in prod
3104+
ctx.plan(auto_apply=True)
3105+
3106+
# add B[daily] in dev
3107+
with open(models_dir / "b.sql", "w") as f:
3108+
f.write(model_b)
3109+
3110+
# plan + apply dev
3111+
ctx.load()
3112+
ctx.plan(environment="dev", auto_apply=True)
3113+
3114+
def _dates_in_table(table_name: str) -> t.List[str]:
3115+
return [
3116+
str(r[0]) for r in engine_adapter.fetchall(f"select ts from {table_name} order by ts")
3117+
]
3118+
3119+
# verify initial state
3120+
for tbl in ["test.a", "test__dev.b"]:
3121+
assert _dates_in_table(tbl) == [
3122+
"2024-01-01 00:30:00",
3123+
"2024-01-02 01:30:00",
3124+
"2024-01-03 02:30:00",
3125+
"2024-01-04 00:30:00",
3126+
]
3127+
3128+
# restate A in prod
3129+
engine_adapter.execute("delete from test.external_table where ts = '2024-01-02 01:30:00'")
3130+
ctx.plan(
3131+
restate_models=["test.a"],
3132+
start="2024-01-02 00:00:00",
3133+
end="2024-01-03 00:00:00",
3134+
auto_apply=True,
3135+
no_prompts=True,
3136+
)
3137+
3138+
# verify result
3139+
assert _dates_in_table("test.a") == [
3140+
"2024-01-01 00:30:00",
3141+
"2024-01-03 02:30:00",
3142+
"2024-01-04 00:30:00",
3143+
]
3144+
3145+
# dev shouldnt have been affected yet
3146+
assert _dates_in_table("test__dev.b") == [
3147+
"2024-01-01 00:30:00",
3148+
"2024-01-02 01:30:00",
3149+
"2024-01-03 02:30:00",
3150+
"2024-01-04 00:30:00",
3151+
]
3152+
3153+
# plan dev which should trigger the missing intervals to get repopulated
3154+
ctx.plan(environment="dev", auto_apply=True)
3155+
3156+
# dev should have fully refreshed
3157+
# this is proven by the fact that INCREMENTAL_BY_UNIQUE_KEY cant propagate deletes, so if the
3158+
# model was not fully rebuilt, the deleted record would still be present
3159+
for tbl in ["test.a", "test__dev.b"]:
3160+
assert _dates_in_table(tbl) == [
3161+
"2024-01-01 00:30:00",
3162+
"2024-01-03 02:30:00",
3163+
"2024-01-04 00:30:00",
3164+
]
3165+
3166+
30303167
def test_prod_restatement_plan_missing_model_in_dev(
30313168
tmp_path: Path,
30323169
):

0 commit comments

Comments
 (0)