Skip to content

Commit 4772446

Browse files
authored
Fix: Correctly categorize an orphaned indirectly modified snapshot created as a result of a merge of 2 or more directly modified parents (#4329)
1 parent 1780e5e commit 4772446

2 files changed

Lines changed: 187 additions & 1 deletion

File tree

sqlmesh/core/plan/builder.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,19 +592,97 @@ def _categorize_snapshot(
592592

593593
if snapshot.is_model and snapshot.model.forward_only:
594594
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
595-
elif not direct_parent_categories or direct_parent_categories.intersection(
595+
elif direct_parent_categories.intersection(
596596
{SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
597597
):
598598
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING)
599+
elif not direct_parent_categories:
600+
snapshot.categorize_as(self._get_orphaned_indirect_change_category(snapshot))
599601
elif SnapshotChangeCategory.FORWARD_ONLY in all_upstream_categories:
600602
# FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING
601603
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
604+
elif all_upstream_categories == {SnapshotChangeCategory.METADATA}:
605+
snapshot.categorize_as(SnapshotChangeCategory.METADATA)
602606
else:
603607
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING)
604608
else:
605609
# Metadata updated.
606610
snapshot.categorize_as(SnapshotChangeCategory.METADATA)
607611

612+
def _get_orphaned_indirect_change_category(
613+
self, indirect_snapshot: Snapshot
614+
) -> SnapshotChangeCategory:
615+
"""Sometimes an indirectly changed downstream snapshot ends up with no directly changed parents introduced in the same plan.
616+
This may happen when 2 or more parent models were changed independently in different plans and then the changes were
617+
merged together and applied in a single plan. As a result, a combination of 2 or more previously changed parents produces
618+
a new downstream snapshot not previously seen.
619+
620+
This function is used to infer the correct change category for such downstream snapshots based on change categories of their parents.
621+
"""
622+
previous_snapshot = self._context_diff.modified_snapshots[indirect_snapshot.name][1]
623+
previous_parent_snapshot_ids = {p.name: p for p in previous_snapshot.parents}
624+
625+
current_parent_snapshots = [
626+
self._context_diff.snapshots[p_id]
627+
for p_id in indirect_snapshot.parents
628+
if p_id in self._context_diff.snapshots
629+
]
630+
631+
indirect_category: t.Optional[SnapshotChangeCategory] = None
632+
for current_parent_snapshot in current_parent_snapshots:
633+
if current_parent_snapshot.name not in previous_parent_snapshot_ids:
634+
# This is a new parent so falling back to INDIRECT_BREAKING
635+
return SnapshotChangeCategory.INDIRECT_BREAKING
636+
pevious_parent_snapshot_id = previous_parent_snapshot_ids[current_parent_snapshot.name]
637+
638+
if current_parent_snapshot.snapshot_id == pevious_parent_snapshot_id:
639+
# There were no new versions of this parent since the previous version of this snapshot,
640+
# so we can skip it
641+
continue
642+
643+
# Find the previous snapshot ID of the same parent in the historical chain
644+
previous_parent_found = False
645+
previous_parent_categories = set()
646+
for pv in reversed(current_parent_snapshot.all_versions):
647+
pv_snapshot_id = pv.snapshot_id(current_parent_snapshot.name)
648+
if pv_snapshot_id == pevious_parent_snapshot_id:
649+
previous_parent_found = True
650+
break
651+
previous_parent_categories.add(pv.change_category)
652+
653+
if not previous_parent_found:
654+
# The previous parent is not in the historical chain so falling back to INDIRECT_BREAKING
655+
return SnapshotChangeCategory.INDIRECT_BREAKING
656+
657+
if previous_parent_categories.intersection(
658+
{SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
659+
):
660+
# One of the new parents in the chain was breaking so this indirect snapshot is breaking
661+
return SnapshotChangeCategory.INDIRECT_BREAKING
662+
663+
if SnapshotChangeCategory.FORWARD_ONLY in previous_parent_categories:
664+
# One of the new parents in the chain was forward-only so this indirect snapshot is forward-only
665+
indirect_category = SnapshotChangeCategory.FORWARD_ONLY
666+
elif (
667+
previous_parent_categories.intersection(
668+
{
669+
SnapshotChangeCategory.NON_BREAKING,
670+
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
671+
}
672+
)
673+
and indirect_category != SnapshotChangeCategory.FORWARD_ONLY
674+
):
675+
# All changes in the chain were non-breaking so this indirect snapshot can be non-breaking too
676+
indirect_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING
677+
elif (
678+
previous_parent_categories == {SnapshotChangeCategory.METADATA}
679+
and indirect_category is None
680+
):
681+
# All changes in the chain were metadata so this indirect snapshot can be metadata too
682+
indirect_category = SnapshotChangeCategory.METADATA
683+
684+
return indirect_category or SnapshotChangeCategory.INDIRECT_BREAKING
685+
608686
def _apply_effective_from(self) -> None:
609687
if self._effective_from:
610688
if not self._forward_only:

tests/core/test_integration.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2247,6 +2247,114 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot_migration(
22472247
assert count > 0
22482248

22492249

2250+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2251+
@pytest.mark.parametrize(
2252+
"parent_a_category,parent_b_category,expected_child_category",
2253+
[
2254+
(
2255+
SnapshotChangeCategory.BREAKING,
2256+
SnapshotChangeCategory.BREAKING,
2257+
SnapshotChangeCategory.INDIRECT_BREAKING,
2258+
),
2259+
(
2260+
SnapshotChangeCategory.NON_BREAKING,
2261+
SnapshotChangeCategory.NON_BREAKING,
2262+
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
2263+
),
2264+
(
2265+
SnapshotChangeCategory.BREAKING,
2266+
SnapshotChangeCategory.NON_BREAKING,
2267+
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
2268+
),
2269+
(
2270+
SnapshotChangeCategory.NON_BREAKING,
2271+
SnapshotChangeCategory.BREAKING,
2272+
SnapshotChangeCategory.INDIRECT_BREAKING,
2273+
),
2274+
(
2275+
SnapshotChangeCategory.NON_BREAKING,
2276+
SnapshotChangeCategory.METADATA,
2277+
SnapshotChangeCategory.METADATA,
2278+
),
2279+
(
2280+
SnapshotChangeCategory.BREAKING,
2281+
SnapshotChangeCategory.METADATA,
2282+
SnapshotChangeCategory.METADATA,
2283+
),
2284+
(
2285+
SnapshotChangeCategory.METADATA,
2286+
SnapshotChangeCategory.BREAKING,
2287+
SnapshotChangeCategory.INDIRECT_BREAKING,
2288+
),
2289+
(
2290+
SnapshotChangeCategory.METADATA,
2291+
SnapshotChangeCategory.NON_BREAKING,
2292+
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
2293+
),
2294+
(
2295+
SnapshotChangeCategory.METADATA,
2296+
SnapshotChangeCategory.METADATA,
2297+
SnapshotChangeCategory.METADATA,
2298+
),
2299+
(
2300+
SnapshotChangeCategory.FORWARD_ONLY,
2301+
SnapshotChangeCategory.BREAKING,
2302+
SnapshotChangeCategory.INDIRECT_BREAKING,
2303+
),
2304+
(
2305+
SnapshotChangeCategory.BREAKING,
2306+
SnapshotChangeCategory.FORWARD_ONLY,
2307+
SnapshotChangeCategory.FORWARD_ONLY,
2308+
),
2309+
(
2310+
SnapshotChangeCategory.FORWARD_ONLY,
2311+
SnapshotChangeCategory.FORWARD_ONLY,
2312+
SnapshotChangeCategory.FORWARD_ONLY,
2313+
),
2314+
],
2315+
)
2316+
def test_rebase_two_changed_parents(
2317+
init_and_plan_context: t.Callable,
2318+
parent_a_category: SnapshotChangeCategory, # This change is deployed to prod first
2319+
parent_b_category: SnapshotChangeCategory, # This change is deployed to prod second
2320+
expected_child_category: SnapshotChangeCategory,
2321+
):
2322+
context, plan = init_and_plan_context("examples/sushi")
2323+
context.apply(plan)
2324+
2325+
initial_model_a = context.get_model("sushi.orders")
2326+
initial_model_b = context.get_model("sushi.items")
2327+
2328+
# Make change A and deploy it to dev_a
2329+
context.upsert_model(initial_model_a.name, stamp="1")
2330+
plan_builder = context.plan_builder("dev_a", skip_tests=True)
2331+
plan_builder.set_choice(context.get_snapshot(initial_model_a.name), parent_a_category)
2332+
context.apply(plan_builder.build())
2333+
2334+
# Make change B and deploy it to dev_b
2335+
context.upsert_model(initial_model_a)
2336+
context.upsert_model(initial_model_b.name, stamp="1")
2337+
plan_builder = context.plan_builder("dev_b", skip_tests=True)
2338+
plan_builder.set_choice(context.get_snapshot(initial_model_b.name), parent_b_category)
2339+
context.apply(plan_builder.build())
2340+
2341+
# Deploy change A to prod
2342+
context.upsert_model(initial_model_a.name, stamp="1")
2343+
context.upsert_model(initial_model_b)
2344+
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
2345+
2346+
# Apply change B in addition to A and plan against prod
2347+
context.upsert_model(initial_model_b.name, stamp="1")
2348+
plan = context.plan_builder("prod", skip_tests=True).build()
2349+
2350+
# Validate the category of child snapshots
2351+
direct_child_snapshot = plan.snapshots[context.get_snapshot("sushi.order_items").snapshot_id]
2352+
assert direct_child_snapshot.change_category == expected_child_category
2353+
2354+
indirect_child_snapshot = plan.snapshots[context.get_snapshot("sushi.top_waiters").snapshot_id]
2355+
assert indirect_child_snapshot.change_category == expected_child_category
2356+
2357+
22502358
@time_machine.travel("2023-01-08 15:00:00 UTC")
22512359
def test_unaligned_start_snapshot_with_non_deployable_downstream(init_and_plan_context: t.Callable):
22522360
context, _ = init_and_plan_context("examples/sushi")

0 commit comments

Comments
 (0)