|
1 | 1 | import pytest |
| 2 | +import typing as t |
2 | 3 | from sqlglot import parse_one |
3 | 4 | from pytest_mock.plugin import MockerFixture |
4 | 5 |
|
|
7 | 8 | from sqlmesh.core.plan.stages import ( |
8 | 9 | build_plan_stages, |
9 | 10 | AfterAllStage, |
| 11 | + AuditOnlyRunStage, |
10 | 12 | PhysicalLayerUpdateStage, |
11 | 13 | BeforeAllStage, |
12 | 14 | BackfillStage, |
|
15 | 17 | RestatementStage, |
16 | 18 | MigrateSchemasStage, |
17 | 19 | ) |
18 | | -from sqlmesh.core.snapshot.definition import SnapshotChangeCategory, DeployabilityIndex, Snapshot |
| 20 | +from sqlmesh.core.snapshot.definition import ( |
| 21 | + SnapshotChangeCategory, |
| 22 | + DeployabilityIndex, |
| 23 | + Snapshot, |
| 24 | + SnapshotId, |
| 25 | +) |
19 | 26 | from sqlmesh.core.state_sync import StateReader |
20 | 27 | from sqlmesh.core.environment import Environment, EnvironmentStatements |
21 | 28 | from sqlmesh.utils.date import to_timestamp |
@@ -501,7 +508,6 @@ def test_build_plan_stages_forward_only( |
501 | 508 | new_snapshot_b.previous_versions = snapshot_b.all_versions |
502 | 509 | new_snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) |
503 | 510 |
|
504 | | - # Mock state reader to return existing snapshots and environment |
505 | 511 | state_reader = mocker.Mock(spec=StateReader) |
506 | 512 | state_reader.get_snapshots.return_value = {} |
507 | 513 | existing_environment = Environment( |
@@ -620,7 +626,6 @@ def test_build_plan_stages_forward_only_dev( |
620 | 626 | new_snapshot_b.previous_versions = snapshot_b.all_versions |
621 | 627 | new_snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) |
622 | 628 |
|
623 | | - # Mock state reader to return existing snapshots and environment |
624 | 629 | state_reader = mocker.Mock(spec=StateReader) |
625 | 630 | state_reader.get_snapshots.return_value = {} |
626 | 631 | state_reader.get_environment.return_value = None |
@@ -703,3 +708,107 @@ def test_build_plan_stages_forward_only_dev( |
703 | 708 | assert len(virtual_stage.promoted_snapshots) == 2 |
704 | 709 | assert len(virtual_stage.demoted_snapshots) == 0 |
705 | 710 | assert {s.name for s in virtual_stage.promoted_snapshots} == {'"a"', '"b"'} |
| 711 | + |
| 712 | + |
| 713 | +def test_build_plan_stages_audit_only( |
| 714 | + snapshot_a: Snapshot, snapshot_b: Snapshot, make_snapshot, mocker: MockerFixture |
| 715 | +) -> None: |
| 716 | + # Categorize snapshot_a as forward-only |
| 717 | + new_snapshot_a = make_snapshot( |
| 718 | + snapshot_a.model.copy(update={"audits": [("not_null", {})]}), |
| 719 | + ) |
| 720 | + new_snapshot_a.previous_versions = snapshot_a.all_versions |
| 721 | + new_snapshot_a.categorize_as(SnapshotChangeCategory.METADATA) |
| 722 | + new_snapshot_a.add_interval("2023-01-01", "2023-01-02") |
| 723 | + |
| 724 | + new_snapshot_b = make_snapshot( |
| 725 | + snapshot_b.model.copy(), |
| 726 | + nodes={'"a"': new_snapshot_a.model}, |
| 727 | + ) |
| 728 | + new_snapshot_b.previous_versions = snapshot_b.all_versions |
| 729 | + new_snapshot_b.categorize_as(SnapshotChangeCategory.METADATA) |
| 730 | + new_snapshot_b.add_interval("2023-01-01", "2023-01-02") |
| 731 | + |
| 732 | + def _get_snapshots(snapshot_ids: t.List[SnapshotId]) -> t.Dict[SnapshotId, Snapshot]: |
| 733 | + if snapshot_a.snapshot_id in snapshot_ids and snapshot_b.snapshot_id in snapshot_ids: |
| 734 | + return { |
| 735 | + snapshot_a.snapshot_id: snapshot_a, |
| 736 | + snapshot_b.snapshot_id: snapshot_b, |
| 737 | + } |
| 738 | + return {} |
| 739 | + |
| 740 | + state_reader = mocker.Mock(spec=StateReader) |
| 741 | + state_reader.get_snapshots.side_effect = _get_snapshots |
| 742 | + state_reader.get_environment.return_value = None |
| 743 | + |
| 744 | + # Create environment |
| 745 | + environment = Environment( |
| 746 | + name="dev", |
| 747 | + snapshots=[new_snapshot_a.table_info, new_snapshot_b.table_info], |
| 748 | + start_at="2023-01-01", |
| 749 | + end_at="2023-01-02", |
| 750 | + plan_id="test_plan", |
| 751 | + previous_plan_id=None, |
| 752 | + promoted_snapshot_ids=[new_snapshot_a.snapshot_id, new_snapshot_b.snapshot_id], |
| 753 | + ) |
| 754 | + |
| 755 | + # Create evaluatable plan |
| 756 | + plan = EvaluatablePlan( |
| 757 | + start="2023-01-01", |
| 758 | + end="2023-01-02", |
| 759 | + new_snapshots=[new_snapshot_a, new_snapshot_b], |
| 760 | + environment=environment, |
| 761 | + no_gaps=False, |
| 762 | + skip_backfill=False, |
| 763 | + empty_backfill=False, |
| 764 | + restatements={}, |
| 765 | + is_dev=True, |
| 766 | + allow_destructive_models=set(), |
| 767 | + forward_only=False, |
| 768 | + end_bounded=False, |
| 769 | + ensure_finalized_snapshots=False, |
| 770 | + directly_modified_snapshots=[new_snapshot_a.snapshot_id], |
| 771 | + indirectly_modified_snapshots={ |
| 772 | + new_snapshot_a.name: [new_snapshot_b.snapshot_id], |
| 773 | + }, |
| 774 | + removed_snapshots=[], |
| 775 | + requires_backfill=True, |
| 776 | + models_to_backfill=None, |
| 777 | + interval_end_per_model=None, |
| 778 | + execution_time="2023-01-02", |
| 779 | + disabled_restatement_models=set(), |
| 780 | + environment_statements=None, |
| 781 | + user_provided_flags=None, |
| 782 | + ) |
| 783 | + |
| 784 | + # Build plan stages |
| 785 | + stages = build_plan_stages(plan, state_reader, None) |
| 786 | + |
| 787 | + # Verify stages |
| 788 | + assert len(stages) == 5 |
| 789 | + |
| 790 | + # Verify PhysicalLayerUpdateStage |
| 791 | + physical_stage = stages[0] |
| 792 | + assert isinstance(physical_stage, PhysicalLayerUpdateStage) |
| 793 | + assert len(physical_stage.snapshots) == 0 |
| 794 | + |
| 795 | + # Verify AuditOnlyRunStage |
| 796 | + audit_only_stage = stages[1] |
| 797 | + assert isinstance(audit_only_stage, AuditOnlyRunStage) |
| 798 | + assert len(audit_only_stage.snapshots) == 1 |
| 799 | + assert audit_only_stage.snapshots[0].snapshot_id == new_snapshot_a.snapshot_id |
| 800 | + |
| 801 | + # Verify BackfillStage |
| 802 | + backfill_stage = stages[2] |
| 803 | + assert isinstance(backfill_stage, BackfillStage) |
| 804 | + assert len(backfill_stage.snapshot_to_intervals) == 0 |
| 805 | + |
| 806 | + # Verify EnvironmentRecordUpdateStage |
| 807 | + assert isinstance(stages[3], EnvironmentRecordUpdateStage) |
| 808 | + |
| 809 | + # Verify VirtualLayerUpdateStage |
| 810 | + virtual_stage = stages[4] |
| 811 | + assert isinstance(virtual_stage, VirtualLayerUpdateStage) |
| 812 | + assert len(virtual_stage.promoted_snapshots) == 2 |
| 813 | + assert len(virtual_stage.demoted_snapshots) == 0 |
| 814 | + assert {s.name for s in virtual_stage.promoted_snapshots} == {'"a"', '"b"'} |
0 commit comments