Skip to content

Commit 7e4e388

Browse files
committed
add tests for the plan stage builder
1 parent c504145 commit 7e4e388

1 file changed

Lines changed: 258 additions & 0 deletions

File tree

tests/core/test_plan_stages.py

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pytest_mock.plugin import MockerFixture
55

66
from sqlmesh.core.config import EnvironmentSuffixTarget
7+
from sqlmesh.core.config.common import VirtualEnvironmentMode
78
from sqlmesh.core.model import SqlModel, ModelKindName
89
from sqlmesh.core.plan.definition import EvaluatablePlan
910
from sqlmesh.core.plan.stages import (
@@ -1387,3 +1388,260 @@ def test_build_plan_stages_indirect_non_breaking_view_migration(
13871388

13881389
migrate_schemas_stage = stages[4]
13891390
assert {s.snapshot_id for s in migrate_schemas_stage.snapshots} == {new_snapshot_c.snapshot_id}
1391+
1392+
1393+
def test_build_plan_stages_virtual_environment_mode_filtering(
1394+
make_snapshot, mocker: MockerFixture
1395+
) -> None:
1396+
# Create snapshots with different virtual environment modes
1397+
snapshot_full = make_snapshot(
1398+
SqlModel(
1399+
name="full_model",
1400+
query=parse_one("select 1, ds"),
1401+
kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ds"),
1402+
)
1403+
)
1404+
snapshot_full.virtual_environment_mode = VirtualEnvironmentMode.FULL
1405+
snapshot_full.categorize_as(SnapshotChangeCategory.BREAKING)
1406+
1407+
snapshot_dev_only = make_snapshot(
1408+
SqlModel(
1409+
name="dev_only_model",
1410+
query=parse_one("select 2, ds"),
1411+
kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ds"),
1412+
)
1413+
)
1414+
snapshot_dev_only.virtual_environment_mode = VirtualEnvironmentMode.DEV_ONLY
1415+
snapshot_dev_only.categorize_as(SnapshotChangeCategory.BREAKING)
1416+
1417+
# Mock state reader
1418+
state_reader = mocker.Mock(spec=StateReader)
1419+
state_reader.get_snapshots.return_value = {}
1420+
state_reader.get_environment.return_value = None
1421+
1422+
# Test 1: Dev environment - both snapshots should be included
1423+
environment_dev = Environment(
1424+
name="dev",
1425+
snapshots=[snapshot_full.table_info, snapshot_dev_only.table_info],
1426+
start_at="2023-01-01",
1427+
end_at="2023-01-02",
1428+
plan_id="test_plan",
1429+
previous_plan_id=None,
1430+
promoted_snapshot_ids=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1431+
)
1432+
1433+
plan_dev = EvaluatablePlan(
1434+
start="2023-01-01",
1435+
end="2023-01-02",
1436+
new_snapshots=[snapshot_full, snapshot_dev_only],
1437+
environment=environment_dev,
1438+
no_gaps=False,
1439+
skip_backfill=False,
1440+
empty_backfill=False,
1441+
restatements={},
1442+
is_dev=True,
1443+
allow_destructive_models=set(),
1444+
forward_only=False,
1445+
end_bounded=False,
1446+
ensure_finalized_snapshots=False,
1447+
directly_modified_snapshots=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1448+
indirectly_modified_snapshots={},
1449+
metadata_updated_snapshots=[],
1450+
removed_snapshots=[],
1451+
requires_backfill=True,
1452+
models_to_backfill=None,
1453+
execution_time="2023-01-02",
1454+
disabled_restatement_models=set(),
1455+
environment_statements=None,
1456+
user_provided_flags=None,
1457+
)
1458+
1459+
stages_dev = build_plan_stages(plan_dev, state_reader, None)
1460+
1461+
# Find VirtualLayerUpdateStage
1462+
virtual_stage_dev = next(
1463+
stage for stage in stages_dev if isinstance(stage, VirtualLayerUpdateStage)
1464+
)
1465+
1466+
# In dev environment, both snapshots should be promoted regardless of virtual_environment_mode
1467+
assert {s.name for s in virtual_stage_dev.promoted_snapshots} == {
1468+
'"full_model"',
1469+
'"dev_only_model"',
1470+
}
1471+
assert len(virtual_stage_dev.demoted_snapshots) == 0
1472+
1473+
# Test 2: Production environment - only FULL mode snapshots should be included
1474+
environment_prod = Environment(
1475+
name="prod",
1476+
snapshots=[snapshot_full.table_info, snapshot_dev_only.table_info],
1477+
start_at="2023-01-01",
1478+
end_at="2023-01-02",
1479+
plan_id="test_plan",
1480+
previous_plan_id=None,
1481+
promoted_snapshot_ids=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1482+
)
1483+
1484+
plan_prod = EvaluatablePlan(
1485+
start="2023-01-01",
1486+
end="2023-01-02",
1487+
new_snapshots=[snapshot_full, snapshot_dev_only],
1488+
environment=environment_prod,
1489+
no_gaps=False,
1490+
skip_backfill=False,
1491+
empty_backfill=False,
1492+
restatements={},
1493+
is_dev=False,
1494+
allow_destructive_models=set(),
1495+
forward_only=False,
1496+
end_bounded=False,
1497+
ensure_finalized_snapshots=False,
1498+
directly_modified_snapshots=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1499+
indirectly_modified_snapshots={},
1500+
metadata_updated_snapshots=[],
1501+
removed_snapshots=[],
1502+
requires_backfill=True,
1503+
models_to_backfill=None,
1504+
execution_time="2023-01-02",
1505+
disabled_restatement_models=set(),
1506+
environment_statements=None,
1507+
user_provided_flags=None,
1508+
)
1509+
1510+
stages_prod = build_plan_stages(plan_prod, state_reader, None)
1511+
1512+
# Find VirtualLayerUpdateStage
1513+
virtual_stage_prod = next(
1514+
stage for stage in stages_prod if isinstance(stage, VirtualLayerUpdateStage)
1515+
)
1516+
1517+
# In production environment, only FULL mode snapshots should be promoted
1518+
assert {s.name for s in virtual_stage_prod.promoted_snapshots} == {'"full_model"'}
1519+
assert len(virtual_stage_prod.demoted_snapshots) == 0
1520+
1521+
# Test 3: Production environment with demoted snapshots
1522+
existing_environment = Environment(
1523+
name="prod",
1524+
snapshots=[snapshot_full.table_info, snapshot_dev_only.table_info],
1525+
start_at="2023-01-01",
1526+
end_at="2023-01-02",
1527+
plan_id="previous_plan",
1528+
previous_plan_id=None,
1529+
promoted_snapshot_ids=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1530+
finalized_ts=to_timestamp("2023-01-02"),
1531+
)
1532+
state_reader.get_environment.return_value = existing_environment
1533+
1534+
# Remove both snapshots from the new environment
1535+
environment_prod_demote = Environment(
1536+
name="prod",
1537+
snapshots=[],
1538+
start_at="2023-01-01",
1539+
end_at="2023-01-02",
1540+
plan_id="test_plan",
1541+
previous_plan_id="previous_plan",
1542+
promoted_snapshot_ids=[],
1543+
)
1544+
1545+
plan_prod_demote = EvaluatablePlan(
1546+
start="2023-01-01",
1547+
end="2023-01-02",
1548+
new_snapshots=[],
1549+
environment=environment_prod_demote,
1550+
no_gaps=False,
1551+
skip_backfill=False,
1552+
empty_backfill=False,
1553+
restatements={},
1554+
is_dev=False,
1555+
allow_destructive_models=set(),
1556+
forward_only=False,
1557+
end_bounded=False,
1558+
ensure_finalized_snapshots=False,
1559+
directly_modified_snapshots=[],
1560+
indirectly_modified_snapshots={},
1561+
metadata_updated_snapshots=[],
1562+
removed_snapshots=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1563+
requires_backfill=False,
1564+
models_to_backfill=None,
1565+
execution_time="2023-01-02",
1566+
disabled_restatement_models=set(),
1567+
environment_statements=None,
1568+
user_provided_flags=None,
1569+
)
1570+
1571+
stages_prod_demote = build_plan_stages(plan_prod_demote, state_reader, None)
1572+
1573+
# Find VirtualLayerUpdateStage
1574+
virtual_stage_prod_demote = next(
1575+
stage for stage in stages_prod_demote if isinstance(stage, VirtualLayerUpdateStage)
1576+
)
1577+
1578+
# In production environment, only FULL mode snapshots should be demoted
1579+
assert len(virtual_stage_prod_demote.promoted_snapshots) == 0
1580+
assert {s.name for s in virtual_stage_prod_demote.demoted_snapshots} == {'"full_model"'}
1581+
assert (
1582+
virtual_stage_prod_demote.demoted_environment_naming_info
1583+
== existing_environment.naming_info
1584+
)
1585+
1586+
1587+
def test_build_plan_stages_virtual_environment_mode_no_updates(
1588+
snapshot_a: Snapshot, make_snapshot, mocker: MockerFixture
1589+
) -> None:
1590+
# Create snapshot with DEV_ONLY mode
1591+
snapshot_dev_only = make_snapshot(
1592+
SqlModel(
1593+
name="dev_only_model",
1594+
query=parse_one("select 1, ds"),
1595+
kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ds"),
1596+
)
1597+
)
1598+
snapshot_dev_only.virtual_environment_mode = VirtualEnvironmentMode.DEV_ONLY
1599+
snapshot_dev_only.categorize_as(SnapshotChangeCategory.BREAKING)
1600+
1601+
# Mock state reader
1602+
state_reader = mocker.Mock(spec=StateReader)
1603+
state_reader.get_snapshots.return_value = {}
1604+
state_reader.get_environment.return_value = None
1605+
1606+
# Production environment with only DEV_ONLY snapshots
1607+
environment = Environment(
1608+
name="prod",
1609+
snapshots=[snapshot_dev_only.table_info],
1610+
start_at="2023-01-01",
1611+
end_at="2023-01-02",
1612+
plan_id="test_plan",
1613+
previous_plan_id=None,
1614+
promoted_snapshot_ids=[snapshot_dev_only.snapshot_id],
1615+
)
1616+
1617+
plan = EvaluatablePlan(
1618+
start="2023-01-01",
1619+
end="2023-01-02",
1620+
new_snapshots=[snapshot_dev_only],
1621+
environment=environment,
1622+
no_gaps=False,
1623+
skip_backfill=False,
1624+
empty_backfill=False,
1625+
restatements={},
1626+
is_dev=False,
1627+
allow_destructive_models=set(),
1628+
forward_only=False,
1629+
end_bounded=False,
1630+
ensure_finalized_snapshots=False,
1631+
directly_modified_snapshots=[snapshot_dev_only.snapshot_id],
1632+
indirectly_modified_snapshots={},
1633+
metadata_updated_snapshots=[],
1634+
removed_snapshots=[],
1635+
requires_backfill=True,
1636+
models_to_backfill=None,
1637+
execution_time="2023-01-02",
1638+
disabled_restatement_models=set(),
1639+
environment_statements=None,
1640+
user_provided_flags=None,
1641+
)
1642+
1643+
stages = build_plan_stages(plan, state_reader, None)
1644+
1645+
# No VirtualLayerUpdateStage should be created since all snapshots are filtered out
1646+
virtual_stages = [stage for stage in stages if isinstance(stage, VirtualLayerUpdateStage)]
1647+
assert len(virtual_stages) == 0

0 commit comments

Comments
 (0)