Skip to content

Commit b4ddf75

Browse files
committed
Fix: Make plan dates relative to --execution-time and not the current time
1 parent 5b78b35 commit b4ddf75

7 files changed

Lines changed: 278 additions & 23 deletions

File tree

sqlmesh/core/context.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1513,7 +1513,11 @@ def plan_builder(
15131513
# If no end date is specified, use the max interval end from prod
15141514
# to prevent unintended evaluation of the entire DAG.
15151515
default_start, default_end = self._get_plan_default_start_end(
1516-
snapshots, max_interval_end_per_model, backfill_models, modified_model_names
1516+
snapshots,
1517+
max_interval_end_per_model,
1518+
backfill_models,
1519+
modified_model_names,
1520+
execution_time,
15171521
)
15181522

15191523
# Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model.
@@ -2818,6 +2822,7 @@ def _get_plan_default_start_end(
28182822
max_interval_end_per_model: t.Dict[str, int],
28192823
backfill_models: t.Optional[t.Set[str]],
28202824
modified_model_names: t.Set[str],
2825+
execution_time: t.Optional[TimeLike] = None,
28212826
) -> t.Tuple[t.Optional[int], t.Optional[int]]:
28222827
if not max_interval_end_per_model:
28232828
return None, None
@@ -2843,6 +2848,12 @@ def _get_plan_default_start_end(
28432848
)
28442849
),
28452850
)
2851+
2852+
if execution_time and to_timestamp(default_end) > to_timestamp(execution_time):
2853+
# the end date can't be in the future, which can happen if a specific `execution_time` is set and prod intervals
2854+
# are newer than it
2855+
default_end = to_timestamp(execution_time)
2856+
28462857
return default_start, default_end
28472858

28482859
def _get_max_interval_end_per_model(

sqlmesh/core/plan/builder.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
to_datetime,
4242
yesterday_ds,
4343
to_timestamp,
44+
time_like_to_str,
4445
)
4546
from sqlmesh.utils.errors import NoChangesPlanError, PlanError
4647

@@ -55,6 +56,7 @@ class PlanBuilder:
5556
start: The start time to backfill data.
5657
end: The end time to backfill data.
5758
execution_time: The date/time time reference to use for execution time. Defaults to now.
59+
If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time
5860
apply: The callback to apply the plan.
5961
restate_models: A list of models for which the data should be restated for the time range
6062
specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
@@ -172,6 +174,22 @@ def is_start_and_end_allowed(self) -> bool:
172174
"""Indicates whether this plan allows to set the start and end dates."""
173175
return self._is_dev or bool(self._restate_models)
174176

177+
@property
178+
def start(self) -> t.Optional[TimeLike]:
179+
if self._start and self._execution_time:
180+
return to_datetime(self._start, relative_base=to_datetime(self._execution_time))
181+
return self._start
182+
183+
@property
184+
def end(self) -> t.Optional[TimeLike]:
185+
if self._end and self._execution_time:
186+
return to_datetime(self._end, relative_base=to_datetime(self._execution_time))
187+
return self._end
188+
189+
@property
190+
def execution_time(self) -> TimeLike:
191+
return self._execution_time or now()
192+
175193
def set_start(self, new_start: TimeLike) -> PlanBuilder:
176194
self._start = new_start
177195
self.override_start = True
@@ -256,7 +274,7 @@ def build(self) -> Plan:
256274
)
257275

258276
restatements = self._build_restatements(
259-
dag, earliest_interval_start(self._context_diff.snapshots.values())
277+
dag, earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time)
260278
)
261279
models_to_backfill = self._build_models_to_backfill(dag, restatements)
262280

@@ -269,8 +287,8 @@ def build(self) -> Plan:
269287
plan = Plan(
270288
context_diff=self._context_diff,
271289
plan_id=self._plan_id,
272-
provided_start=self._start,
273-
provided_end=self._end,
290+
provided_start=self.start,
291+
provided_end=self.end,
274292
is_dev=self._is_dev,
275293
skip_backfill=self._skip_backfill,
276294
empty_backfill=self._empty_backfill,
@@ -289,7 +307,7 @@ def build(self) -> Plan:
289307
selected_models_to_backfill=self._backfill_models,
290308
models_to_backfill=models_to_backfill,
291309
effective_from=self._effective_from,
292-
execution_time=self._execution_time,
310+
execution_time=self.execution_time,
293311
end_bounded=self._end_bounded,
294312
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
295313
user_provided_flags=self._user_provided_flags,
@@ -739,6 +757,18 @@ def _ensure_valid_date_range(self) -> None:
739757
"The start and end dates can't be set for a production plan without restatements."
740758
)
741759

760+
if (start := self.start) and (end := self.end):
761+
if to_datetime(start) > to_datetime(end):
762+
raise PlanError(
763+
f"Plan end date: '{time_like_to_str(end)}' must be after the plan start date: '{time_like_to_str(start)}'"
764+
)
765+
766+
if end := self.end:
767+
if to_datetime(end) > to_datetime(self.execution_time):
768+
raise PlanError(
769+
f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')"
770+
)
771+
742772
def _ensure_no_forward_only_revert(self) -> None:
743773
"""Ensures that a previously superseded breaking / non-breaking snapshot is not being
744774
used again to replace an existing forward-only snapshot with the same version.

sqlmesh/core/plan/definition.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import datetime
66
from enum import Enum
77
from functools import cached_property
8+
from pydantic import Field
89

910
from sqlmesh.core.context_diff import ContextDiff
1011
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
@@ -63,7 +64,7 @@ class Plan(PydanticModel, frozen=True):
6364
models_to_backfill: t.Optional[t.Set[str]] = None
6465
"""All models that should be backfilled as part of this plan."""
6566
effective_from: t.Optional[TimeLike] = None
66-
execution_time: t.Optional[TimeLike] = None
67+
execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time")
6768

6869
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
6970

@@ -80,7 +81,11 @@ def start(self) -> TimeLike:
8081

8182
@cached_property
8283
def end(self) -> TimeLike:
83-
return self.provided_end or now()
84+
return self.provided_end or self.execution_time
85+
86+
@property
87+
def execution_time(self) -> TimeLike:
88+
return self.execution_time_ or now()
8489

8590
@property
8691
def previous_plan_id(self) -> t.Optional[str]:
@@ -271,7 +276,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
271276

272277
@cached_property
273278
def _earliest_interval_start(self) -> datetime:
274-
return earliest_interval_start(self.snapshots.values())
279+
return earliest_interval_start(self.snapshots.values(), self.execution_time)
275280

276281

277282
class EvaluatablePlan(PydanticModel):
@@ -345,8 +350,10 @@ def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str:
345350
return format_intervals(self.merged_intervals, unit)
346351

347352

348-
def earliest_interval_start(snapshots: t.Collection[Snapshot]) -> datetime:
349-
earliest_start = earliest_start_date(snapshots)
353+
def earliest_interval_start(
354+
snapshots: t.Collection[Snapshot], execution_time: t.Optional[TimeLike] = None
355+
) -> datetime:
356+
earliest_start = earliest_start_date(snapshots, relative_to=execution_time)
350357
earliest_interval_starts = [s.intervals[0][0] for s in snapshots if s.intervals]
351358
return (
352359
min(earliest_start, to_datetime(min(earliest_interval_starts)))

sqlmesh/core/snapshot/definition.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1995,7 +1995,12 @@ def earliest_start_date(
19951995
start_date(snapshot, snapshots, cache=cache, relative_to=relative_to)
19961996
for snapshot in snapshots.values()
19971997
)
1998-
return yesterday()
1998+
1999+
relative_base = None
2000+
if relative_to is not None:
2001+
relative_base = to_datetime(relative_to)
2002+
2003+
return yesterday(relative_base=relative_base)
19992004

20002005

20012006
def start_date(

sqlmesh/utils/date.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,34 +87,34 @@ def now_ds() -> str:
8787
return to_ds(now())
8888

8989

90-
def yesterday() -> datetime:
90+
def yesterday(relative_base: t.Optional[datetime] = None) -> datetime:
9191
"""
9292
Yesterday utc datetime.
9393
9494
Returns:
9595
A datetime object with tz utc representing yesterday's date
9696
"""
97-
return to_datetime("yesterday")
97+
return to_datetime("yesterday", relative_base=relative_base)
9898

9999

100-
def yesterday_ds() -> str:
100+
def yesterday_ds(relative_base: t.Optional[datetime] = None) -> str:
101101
"""
102102
Yesterday utc ds.
103103
104104
Returns:
105105
Yesterday's ds string.
106106
"""
107-
return to_ds("yesterday")
107+
return to_ds("yesterday", relative_base=relative_base)
108108

109109

110-
def yesterday_timestamp() -> int:
110+
def yesterday_timestamp(relative_base: t.Optional[datetime] = None) -> int:
111111
"""
112112
Yesterday utc timestamp.
113113
114114
Returns:
115115
UTC epoch millis timestamp of yesterday
116116
"""
117-
return to_timestamp(yesterday())
117+
return to_timestamp(yesterday(relative_base=relative_base))
118118

119119

120120
def to_timestamp(
@@ -265,19 +265,19 @@ def date_dict(
265265
return kwargs
266266

267267

268-
def to_ds(obj: TimeLike) -> str:
268+
def to_ds(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
269269
"""Converts a TimeLike object into YYYY-MM-DD formatted string."""
270-
return to_ts(obj)[0:10]
270+
return to_ts(obj, relative_base=relative_base)[0:10]
271271

272272

273-
def to_ts(obj: TimeLike) -> str:
273+
def to_ts(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
274274
"""Converts a TimeLike object into YYYY-MM-DD HH:MM:SS formatted string."""
275-
return to_datetime(obj).replace(tzinfo=None).isoformat(sep=" ")
275+
return to_datetime(obj, relative_base=relative_base).replace(tzinfo=None).isoformat(sep=" ")
276276

277277

278-
def to_tstz(obj: TimeLike) -> str:
278+
def to_tstz(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
279279
"""Converts a TimeLike object into YYYY-MM-DD HH:MM:SS+00:00 formatted string."""
280-
return to_datetime(obj).isoformat(sep=" ")
280+
return to_datetime(obj, relative_base=relative_base).isoformat(sep=" ")
281281

282282

283283
def is_date(obj: TimeLike) -> bool:

tests/core/test_context.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
make_inclusive_end,
5050
now,
5151
to_date,
52+
to_datetime,
5253
to_timestamp,
5354
yesterday_ds,
5455
)
@@ -438,6 +439,99 @@ def test_plan_execution_time():
438439
)
439440

440441

442+
def test_plan_execution_time_start_end():
443+
context = Context(config=Config())
444+
context.upsert_model(
445+
load_sql_based_model(
446+
parse(
447+
"""
448+
MODEL(
449+
name db.x,
450+
start '2020-01-01',
451+
kind INCREMENTAL_BY_TIME_RANGE (
452+
time_column ds
453+
),
454+
cron '@daily'
455+
);
456+
457+
SELECT id, ds FROM (VALUES
458+
('1', '2020-01-01'),
459+
('2', '2021-01-01'),
460+
('3', '2022-01-01'),
461+
('4', '2023-01-01'),
462+
('5', '2024-01-01')
463+
) data(id, ds)
464+
WHERE ds BETWEEN @start_ds AND @end_ds
465+
"""
466+
)
467+
)
468+
)
469+
470+
# prod plan - no fixed execution time so it defaults to now() and reads all the data
471+
prod_plan = context.plan(auto_apply=True)
472+
473+
assert len(prod_plan.new_snapshots) == 1
474+
475+
context.upsert_model(
476+
load_sql_based_model(
477+
parse(
478+
"""
479+
MODEL(
480+
name db.x,
481+
start '2020-01-01',
482+
kind INCREMENTAL_BY_TIME_RANGE (
483+
time_column ds
484+
),
485+
cron '@daily'
486+
);
487+
488+
SELECT id, ds, 'changed' as a FROM (VALUES
489+
('1', '2020-01-01'),
490+
('2', '2021-01-01'),
491+
('3', '2022-01-01'),
492+
('4', '2023-01-01'),
493+
('5', '2024-01-01')
494+
) data(id, ds)
495+
WHERE ds BETWEEN @start_ds AND @end_ds
496+
"""
497+
)
498+
)
499+
)
500+
501+
# dev plan with an execution time in the past and no explicit start/end specified
502+
# the plan end should be bounded to it and not exceed it even though in prod the last interval (used as a default end)
503+
# is newer than the execution time
504+
dev_plan = context.plan("dev", execution_time="2020-01-05")
505+
506+
assert to_datetime(dev_plan.start) == to_datetime(
507+
"2020-01-01"
508+
) # default start is the earliest prod interval
509+
assert to_datetime(dev_plan.execution_time) == to_datetime("2020-01-05")
510+
assert to_datetime(dev_plan.end) == to_datetime(
511+
"2020-01-05"
512+
) # end should not be greater than execution_time
513+
514+
# same as above but with a relative start
515+
dev_plan = context.plan("dev", start="1 day ago", execution_time="2020-01-05")
516+
517+
assert to_datetime(dev_plan.start) == to_datetime(
518+
"2020-01-04"
519+
) # start relative to execution_time
520+
assert to_datetime(dev_plan.execution_time) == to_datetime("2020-01-05")
521+
assert to_datetime(dev_plan.end) == to_datetime(
522+
"2020-01-05"
523+
) # end should not be greater than execution_time
524+
525+
# same as above but with a relative start and a relative end
526+
dev_plan = context.plan("dev", start="2 days ago", execution_time="2020-01-05", end="1 day ago")
527+
528+
assert to_datetime(dev_plan.start) == to_datetime(
529+
"2020-01-03"
530+
) # start relative to execution_time
531+
assert to_datetime(dev_plan.execution_time) == to_datetime("2020-01-05")
532+
assert to_datetime(dev_plan.end) == to_datetime("2020-01-04") # end relative to execution_time
533+
534+
441535
def test_override_builtin_audit_blocking_mode():
442536
context = Context(config=Config())
443537
context.upsert_model(

0 commit comments

Comments
 (0)