Skip to content

Commit 660be25

Browse files
authored
Fix: Make plan dates relative to --execution-time (#4702)
1 parent 2efe428 commit 660be25

9 files changed

Lines changed: 334 additions & 25 deletions

File tree

sqlmesh/core/context.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,14 @@
120120
from sqlmesh.utils import UniqueKeyDict, Verbosity
121121
from sqlmesh.utils.concurrency import concurrent_apply_to_values
122122
from sqlmesh.utils.dag import DAG
123-
from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp, format_tz_datetime, now_timestamp
123+
from sqlmesh.utils.date import (
124+
TimeLike,
125+
now_ds,
126+
to_timestamp,
127+
format_tz_datetime,
128+
now_timestamp,
129+
now,
130+
)
124131
from sqlmesh.utils.errors import (
125132
CircuitBreakerError,
126133
ConfigError,
@@ -1513,7 +1520,11 @@ def plan_builder(
15131520
# If no end date is specified, use the max interval end from prod
15141521
# to prevent unintended evaluation of the entire DAG.
15151522
default_start, default_end = self._get_plan_default_start_end(
1516-
snapshots, max_interval_end_per_model, backfill_models, modified_model_names
1523+
snapshots,
1524+
max_interval_end_per_model,
1525+
backfill_models,
1526+
modified_model_names,
1527+
execution_time or now(),
15171528
)
15181529

15191530
# Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model.
@@ -2818,6 +2829,7 @@ def _get_plan_default_start_end(
28182829
max_interval_end_per_model: t.Dict[str, int],
28192830
backfill_models: t.Optional[t.Set[str]],
28202831
modified_model_names: t.Set[str],
2832+
execution_time: t.Optional[TimeLike] = None,
28212833
) -> t.Tuple[t.Optional[int], t.Optional[int]]:
28222834
if not max_interval_end_per_model:
28232835
return None, None
@@ -2843,6 +2855,12 @@ def _get_plan_default_start_end(
28432855
)
28442856
),
28452857
)
2858+
2859+
if execution_time and to_timestamp(default_end) > to_timestamp(execution_time):
2860+
# the end date can't be in the future, which can happen if a specific `execution_time` is set and prod intervals
2861+
# are newer than it
2862+
default_end = to_timestamp(execution_time)
2863+
28462864
return default_start, default_end
28472865

28482866
def _get_max_interval_end_per_model(

sqlmesh/core/plan/builder.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
to_datetime,
4242
yesterday_ds,
4343
to_timestamp,
44+
time_like_to_str,
45+
is_relative,
4446
)
4547
from sqlmesh.utils.errors import NoChangesPlanError, PlanError
4648

@@ -55,6 +57,7 @@ class PlanBuilder:
5557
start: The start time to backfill data.
5658
end: The end time to backfill data.
5759
execution_time: The date/time time reference to use for execution time. Defaults to now.
60+
If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time
5861
apply: The callback to apply the plan.
5962
restate_models: A list of models for which the data should be restated for the time range
6063
specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
@@ -137,7 +140,14 @@ def __init__(
137140
self._include_unmodified = include_unmodified
138141
self._restate_models = set(restate_models) if restate_models is not None else None
139142
self._effective_from = effective_from
143+
144+
# note: this deliberately doesnt default to now() here.
145+
# There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run
146+
# so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None
147+
# in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past
148+
# ref: https://github.com/TobikoData/sqlmesh/pull/4702#discussion_r2140696156
140149
self._execution_time = execution_time
150+
141151
self._backfill_models = backfill_models
142152
self._end = end or default_end
143153
self._apply = apply
@@ -172,6 +182,26 @@ def is_start_and_end_allowed(self) -> bool:
172182
"""Indicates whether this plan allows to set the start and end dates."""
173183
return self._is_dev or bool(self._restate_models)
174184

185+
@property
186+
def start(self) -> t.Optional[TimeLike]:
187+
if self._start and is_relative(self._start):
188+
# only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
189+
return to_datetime(self._start, relative_base=to_datetime(self.execution_time))
190+
return self._start
191+
192+
@property
193+
def end(self) -> t.Optional[TimeLike]:
194+
if self._end and is_relative(self._end):
195+
# only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
196+
return to_datetime(self._end, relative_base=to_datetime(self.execution_time))
197+
return self._end
198+
199+
@cached_property
200+
def execution_time(self) -> TimeLike:
201+
# this is cached to return a stable value from now() in the places where the execution time matters for resolving relative date strings
202+
# during the plan building process
203+
return self._execution_time or now()
204+
175205
def set_start(self, new_start: TimeLike) -> PlanBuilder:
176206
self._start = new_start
177207
self.override_start = True
@@ -256,7 +286,8 @@ def build(self) -> Plan:
256286
)
257287

258288
restatements = self._build_restatements(
259-
dag, earliest_interval_start(self._context_diff.snapshots.values())
289+
dag,
290+
earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time),
260291
)
261292
models_to_backfill = self._build_models_to_backfill(dag, restatements)
262293

@@ -266,11 +297,17 @@ def build(self) -> Plan:
266297
# model should be ignored.
267298
interval_end_per_model = None
268299

300+
# this deliberately uses the passed in self._execution_time and not self.execution_time cached property
301+
# the reason is because that there can be a delay between the Plan being built and the Plan being actually run,
302+
# so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to
303+
# the current timestamp of when the Plan is eventually run
304+
plan_execution_time = self._execution_time
305+
269306
plan = Plan(
270307
context_diff=self._context_diff,
271308
plan_id=self._plan_id,
272-
provided_start=self._start,
273-
provided_end=self._end,
309+
provided_start=self.start,
310+
provided_end=self.end,
274311
is_dev=self._is_dev,
275312
skip_backfill=self._skip_backfill,
276313
empty_backfill=self._empty_backfill,
@@ -289,7 +326,7 @@ def build(self) -> Plan:
289326
selected_models_to_backfill=self._backfill_models,
290327
models_to_backfill=models_to_backfill,
291328
effective_from=self._effective_from,
292-
execution_time=self._execution_time,
329+
execution_time=plan_execution_time,
293330
end_bounded=self._end_bounded,
294331
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
295332
user_provided_flags=self._user_provided_flags,
@@ -739,6 +776,18 @@ def _ensure_valid_date_range(self) -> None:
739776
"The start and end dates can't be set for a production plan without restatements."
740777
)
741778

779+
if (start := self.start) and (end := self.end):
780+
if to_datetime(start) > to_datetime(end):
781+
raise PlanError(
782+
f"Plan end date: '{time_like_to_str(end)}' must be after the plan start date: '{time_like_to_str(start)}'"
783+
)
784+
785+
if end := self.end:
786+
if to_datetime(end) > to_datetime(self.execution_time):
787+
raise PlanError(
788+
f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')"
789+
)
790+
742791
def _ensure_no_forward_only_revert(self) -> None:
743792
"""Ensures that a previously superseded breaking / non-breaking snapshot is not being
744793
used again to replace an existing forward-only snapshot with the same version.

sqlmesh/core/plan/definition.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import datetime
66
from enum import Enum
77
from functools import cached_property
8+
from pydantic import Field
89

910
from sqlmesh.core.context_diff import ContextDiff
1011
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
@@ -63,7 +64,7 @@ class Plan(PydanticModel, frozen=True):
6364
models_to_backfill: t.Optional[t.Set[str]] = None
6465
"""All models that should be backfilled as part of this plan."""
6566
effective_from: t.Optional[TimeLike] = None
66-
execution_time: t.Optional[TimeLike] = None
67+
execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time")
6768

6869
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
6970

@@ -80,7 +81,12 @@ def start(self) -> TimeLike:
8081

8182
@cached_property
8283
def end(self) -> TimeLike:
83-
return self.provided_end or now()
84+
return self.provided_end or self.execution_time
85+
86+
@cached_property
87+
def execution_time(self) -> TimeLike:
88+
# note: property is cached so that it returns a consistent timestamp for now()
89+
return self.execution_time_ or now()
8490

8591
@property
8692
def previous_plan_id(self) -> t.Optional[str]:
@@ -271,7 +277,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
271277

272278
@cached_property
273279
def _earliest_interval_start(self) -> datetime:
274-
return earliest_interval_start(self.snapshots.values())
280+
return earliest_interval_start(self.snapshots.values(), self.execution_time)
275281

276282

277283
class EvaluatablePlan(PydanticModel):
@@ -345,8 +351,10 @@ def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str:
345351
return format_intervals(self.merged_intervals, unit)
346352

347353

348-
def earliest_interval_start(snapshots: t.Collection[Snapshot]) -> datetime:
349-
earliest_start = earliest_start_date(snapshots)
354+
def earliest_interval_start(
355+
snapshots: t.Collection[Snapshot], execution_time: t.Optional[TimeLike] = None
356+
) -> datetime:
357+
earliest_start = earliest_start_date(snapshots, relative_to=execution_time)
350358
earliest_interval_starts = [s.intervals[0][0] for s in snapshots if s.intervals]
351359
return (
352360
min(earliest_start, to_datetime(min(earliest_interval_starts)))

sqlmesh/core/snapshot/definition.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1995,7 +1995,12 @@ def earliest_start_date(
19951995
start_date(snapshot, snapshots, cache=cache, relative_to=relative_to)
19961996
for snapshot in snapshots.values()
19971997
)
1998-
return yesterday()
1998+
1999+
relative_base = None
2000+
if relative_to is not None:
2001+
relative_base = to_datetime(relative_to)
2002+
2003+
return yesterday(relative_base=relative_base)
19992004

20002005

20012006
def start_date(

sqlmesh/utils/date.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,34 +87,34 @@ def now_ds() -> str:
8787
return to_ds(now())
8888

8989

90-
def yesterday() -> datetime:
90+
def yesterday(relative_base: t.Optional[datetime] = None) -> datetime:
9191
"""
9292
Yesterday utc datetime.
9393
9494
Returns:
9595
A datetime object with tz utc representing yesterday's date
9696
"""
97-
return to_datetime("yesterday")
97+
return to_datetime("yesterday", relative_base=relative_base)
9898

9999

100-
def yesterday_ds() -> str:
100+
def yesterday_ds(relative_base: t.Optional[datetime] = None) -> str:
101101
"""
102102
Yesterday utc ds.
103103
104104
Returns:
105105
Yesterday's ds string.
106106
"""
107-
return to_ds("yesterday")
107+
return to_ds("yesterday", relative_base=relative_base)
108108

109109

110-
def yesterday_timestamp() -> int:
110+
def yesterday_timestamp(relative_base: t.Optional[datetime] = None) -> int:
111111
"""
112112
Yesterday utc timestamp.
113113
114114
Returns:
115115
UTC epoch millis timestamp of yesterday
116116
"""
117-
return to_timestamp(yesterday())
117+
return to_timestamp(yesterday(relative_base=relative_base))
118118

119119

120120
def to_timestamp(
@@ -265,19 +265,19 @@ def date_dict(
265265
return kwargs
266266

267267

268-
def to_ds(obj: TimeLike) -> str:
268+
def to_ds(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
269269
"""Converts a TimeLike object into YYYY-MM-DD formatted string."""
270-
return to_ts(obj)[0:10]
270+
return to_ts(obj, relative_base=relative_base)[0:10]
271271

272272

273-
def to_ts(obj: TimeLike) -> str:
273+
def to_ts(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
274274
"""Converts a TimeLike object into YYYY-MM-DD HH:MM:SS formatted string."""
275-
return to_datetime(obj).replace(tzinfo=None).isoformat(sep=" ")
275+
return to_datetime(obj, relative_base=relative_base).replace(tzinfo=None).isoformat(sep=" ")
276276

277277

278-
def to_tstz(obj: TimeLike) -> str:
278+
def to_tstz(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
279279
"""Converts a TimeLike object into YYYY-MM-DD HH:MM:SS+00:00 formatted string."""
280-
return to_datetime(obj).isoformat(sep=" ")
280+
return to_datetime(obj, relative_base=relative_base).isoformat(sep=" ")
281281

282282

283283
def is_date(obj: TimeLike) -> bool:
@@ -373,6 +373,16 @@ def is_categorical_relative_expression(expression: str) -> bool:
373373
return not any(k in TIME_UNITS for k in grain_kwargs)
374374

375375

376+
def is_relative(value: TimeLike) -> bool:
377+
"""
378+
Tests a TimeLike object to see if it is a relative expression, eg '1 week ago' as opposed to an absolute timestamp
379+
"""
380+
if isinstance(value, str):
381+
return is_categorical_relative_expression(value)
382+
383+
return False
384+
385+
376386
def to_time_column(
377387
time_column: t.Union[TimeLike, exp.Null],
378388
time_column_type: exp.DataType,

0 commit comments

Comments
 (0)