|
46 | 46 | from pathlib import Path |
47 | 47 | from shutil import rmtree |
48 | 48 | from types import MappingProxyType |
| 49 | +from datetime import datetime |
49 | 50 |
|
50 | 51 | from sqlglot import Dialect, exp |
51 | 52 | from sqlglot.helper import first |
|
125 | 126 | format_tz_datetime, |
126 | 127 | now_timestamp, |
127 | 128 | now, |
| 129 | + to_datetime, |
| 130 | + make_exclusive, |
128 | 131 | ) |
129 | 132 | from sqlmesh.utils.errors import ( |
130 | 133 | CircuitBreakerError, |
@@ -1214,6 +1217,7 @@ def plan( |
1214 | 1217 | diff_rendered: t.Optional[bool] = None, |
1215 | 1218 | skip_linter: t.Optional[bool] = None, |
1216 | 1219 | explain: t.Optional[bool] = None, |
| 1220 | + min_intervals: t.Optional[int] = None, |
1217 | 1221 | ) -> Plan: |
1218 | 1222 | """Interactively creates a plan. |
1219 | 1223 |
|
@@ -1260,6 +1264,8 @@ def plan( |
1260 | 1264 | diff_rendered: Whether the diff should compare raw vs rendered models |
1261 | 1265 | skip_linter: Linter runs by default so this will skip it if enabled |
1262 | 1266 | explain: Whether to explain the plan instead of applying it. |
| 1267 | + min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered |
| 1268 | + on every model when checking for missing intervals |
1263 | 1269 |
|
1264 | 1270 | Returns: |
1265 | 1271 | The populated Plan object. |
@@ -1288,6 +1294,7 @@ def plan( |
1288 | 1294 | diff_rendered=diff_rendered, |
1289 | 1295 | skip_linter=skip_linter, |
1290 | 1296 | explain=explain, |
| 1297 | + min_intervals=min_intervals, |
1291 | 1298 | ) |
1292 | 1299 |
|
1293 | 1300 | plan = plan_builder.build() |
@@ -1337,6 +1344,7 @@ def plan_builder( |
1337 | 1344 | diff_rendered: t.Optional[bool] = None, |
1338 | 1345 | skip_linter: t.Optional[bool] = None, |
1339 | 1346 | explain: t.Optional[bool] = None, |
| 1347 | + min_intervals: t.Optional[int] = None, |
1340 | 1348 | ) -> PlanBuilder: |
1341 | 1349 | """Creates a plan builder. |
1342 | 1350 |
|
@@ -1373,6 +1381,8 @@ def plan_builder( |
1373 | 1381 | enable_preview: Indicates whether to enable preview for forward-only models in development environments. |
1374 | 1382 | run: Whether to run latest intervals as part of the plan application. |
1375 | 1383 | diff_rendered: Whether the diff should compare raw vs rendered models |
| 1384 | + min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered |
| 1385 | + on every model when checking for missing intervals |
1376 | 1386 |
|
1377 | 1387 | Returns: |
1378 | 1388 | The plan builder. |
@@ -1400,6 +1410,7 @@ def plan_builder( |
1400 | 1410 | "run": run, |
1401 | 1411 | "diff_rendered": diff_rendered, |
1402 | 1412 | "skip_linter": skip_linter, |
| 1413 | + "min_intervals": min_intervals, |
1403 | 1414 | } |
1404 | 1415 | user_provided_flags: t.Dict[str, UserProvidedFlags] = { |
1405 | 1416 | k: v for k, v in kwargs.items() if v is not None |
@@ -1522,6 +1533,16 @@ def plan_builder( |
1522 | 1533 | # Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model. |
1523 | 1534 | self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values()) |
1524 | 1535 |
|
| 1536 | + start_override_per_model = self._calculate_start_override_per_model( |
| 1537 | + min_intervals, |
| 1538 | + start or default_start, |
| 1539 | + end or default_end, |
| 1540 | + execution_time or now(), |
| 1541 | + backfill_models, |
| 1542 | + snapshots, |
| 1543 | + max_interval_end_per_model, |
| 1544 | + ) |
| 1545 | + |
1525 | 1546 | return self.PLAN_BUILDER_TYPE( |
1526 | 1547 | context_diff=context_diff, |
1527 | 1548 | start=start, |
@@ -1552,7 +1573,8 @@ def plan_builder( |
1552 | 1573 | ), |
1553 | 1574 | end_bounded=not run, |
1554 | 1575 | ensure_finalized_snapshots=self.config.plan.use_finalized_state, |
1555 | | - interval_end_per_model=max_interval_end_per_model, |
| 1576 | + start_override_per_model=start_override_per_model, |
| 1577 | + end_override_per_model=max_interval_end_per_model, |
1556 | 1578 | console=self.console, |
1557 | 1579 | user_provided_flags=user_provided_flags, |
1558 | 1580 | explain=explain or False, |
@@ -2827,15 +2849,15 @@ def _plan_preview_enabled(self) -> bool: |
2827 | 2849 | def _get_plan_default_start_end( |
2828 | 2850 | self, |
2829 | 2851 | snapshots: t.Dict[str, Snapshot], |
2830 | | - max_interval_end_per_model: t.Dict[str, int], |
| 2852 | + max_interval_end_per_model: t.Dict[str, datetime], |
2831 | 2853 | backfill_models: t.Optional[t.Set[str]], |
2832 | 2854 | modified_model_names: t.Set[str], |
2833 | 2855 | execution_time: t.Optional[TimeLike] = None, |
2834 | 2856 | ) -> t.Tuple[t.Optional[int], t.Optional[int]]: |
2835 | 2857 | if not max_interval_end_per_model: |
2836 | 2858 | return None, None |
2837 | 2859 |
|
2838 | | - default_end = max(max_interval_end_per_model.values()) |
| 2860 | + default_end = to_timestamp(max(max_interval_end_per_model.values())) |
2839 | 2861 | default_start: t.Optional[int] = None |
2840 | 2862 | # Infer the default start by finding the smallest interval start that corresponds to the default end. |
2841 | 2863 | for model_name in backfill_models or modified_model_names or max_interval_end_per_model: |
@@ -2864,19 +2886,76 @@ def _get_plan_default_start_end( |
2864 | 2886 |
|
2865 | 2887 | return default_start, default_end |
2866 | 2888 |
|
| 2889 | + def _calculate_start_override_per_model( |
| 2890 | + self, |
| 2891 | + min_intervals: t.Optional[int], |
| 2892 | + plan_start: t.Optional[TimeLike], |
| 2893 | + plan_end: t.Optional[TimeLike], |
| 2894 | + plan_execution_time: TimeLike, |
| 2895 | + backfill_model_fqns: t.Optional[t.Set[str]], |
| 2896 | + snapshots_by_model_fqn: t.Dict[str, Snapshot], |
| 2897 | + end_override_per_model: t.Optional[t.Dict[str, datetime]], |
| 2898 | + ) -> t.Dict[str, datetime]: |
| 2899 | + if not min_intervals or not backfill_model_fqns or not plan_start: |
| 2900 | + # If there are no models to backfill, there are no intervals to consider for backfill, so we dont need to consider a minimum number |
| 2901 | + # If the plan doesnt have a start date, all intervals are considered already so we dont need to consider a minimum number |
| 2902 | + # If we dont have a minimum number of intervals to consider, then we dont need to adjust the start date on a per-model basis |
| 2903 | + return {} |
| 2904 | + |
| 2905 | + start_overrides = {} |
| 2906 | + end_override_per_model = end_override_per_model or {} |
| 2907 | + |
| 2908 | + plan_execution_time_dt = to_datetime(plan_execution_time) |
| 2909 | + plan_start_dt = to_datetime(plan_start, relative_base=plan_execution_time_dt) |
| 2910 | + plan_end_dt = to_datetime( |
| 2911 | + plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt |
| 2912 | + ) |
| 2913 | + |
| 2914 | + for model_fqn in backfill_model_fqns: |
| 2915 | + snapshot = snapshots_by_model_fqn.get(model_fqn) |
| 2916 | + if not snapshot: |
| 2917 | + continue |
| 2918 | + |
| 2919 | + starting_point = end_override_per_model.get(model_fqn, plan_end_dt) |
| 2920 | + if node_end := snapshot.node.end: |
| 2921 | + # if we dont do this, if the node end is a date (as opposed to a timestamp) |
| 2922 | + # we end up incorrectly winding back an extra day |
| 2923 | + node_end_dt = make_exclusive(node_end) |
| 2924 | + |
| 2925 | + if node_end_dt < plan_end_dt: |
| 2926 | + # if the model has an end date that has already elapsed, use that as a starting point for calculating min_intervals |
| 2927 | + # instead of the plan end. If we use the plan end, we will return intervals in the future which are invalid |
| 2928 | + starting_point = node_end_dt |
| 2929 | + |
| 2930 | + snapshot_start = snapshot.node.cron_floor(starting_point) |
| 2931 | + |
| 2932 | + for _ in range(min_intervals): |
| 2933 | + # wind back the starting point by :min_intervals intervals to arrive at the minimum snapshot start date |
| 2934 | + snapshot_start = snapshot.node.cron_prev(snapshot_start) |
| 2935 | + |
| 2936 | + # only consider this an override if the wound-back start date is earlier than the plan start date |
| 2937 | + # if it isnt then the plan already covers :min_intervals intervals for this snapshot |
| 2938 | + if snapshot_start < plan_start_dt: |
| 2939 | + start_overrides[model_fqn] = snapshot_start |
| 2940 | + |
| 2941 | + return start_overrides |
| 2942 | + |
2867 | 2943 | def _get_max_interval_end_per_model( |
2868 | 2944 | self, snapshots: t.Dict[str, Snapshot], backfill_models: t.Optional[t.Set[str]] |
2869 | | - ) -> t.Dict[str, int]: |
| 2945 | + ) -> t.Dict[str, datetime]: |
2870 | 2946 | models_for_interval_end = ( |
2871 | 2947 | self._get_models_for_interval_end(snapshots, backfill_models) |
2872 | 2948 | if backfill_models is not None |
2873 | 2949 | else None |
2874 | 2950 | ) |
2875 | | - return self.state_sync.max_interval_end_per_model( |
2876 | | - c.PROD, |
2877 | | - models=models_for_interval_end, |
2878 | | - ensure_finalized_snapshots=self.config.plan.use_finalized_state, |
2879 | | - ) |
| 2951 | + return { |
| 2952 | + model_fqn: to_datetime(ts) |
| 2953 | + for model_fqn, ts in self.state_sync.max_interval_end_per_model( |
| 2954 | + c.PROD, |
| 2955 | + models=models_for_interval_end, |
| 2956 | + ensure_finalized_snapshots=self.config.plan.use_finalized_state, |
| 2957 | + ).items() |
| 2958 | + } |
2880 | 2959 |
|
2881 | 2960 | @staticmethod |
2882 | 2961 | def _get_models_for_interval_end( |
|
0 commit comments