Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,14 @@
from sqlmesh.utils import UniqueKeyDict, Verbosity
from sqlmesh.utils.concurrency import concurrent_apply_to_values
from sqlmesh.utils.dag import DAG
from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp, format_tz_datetime, now_timestamp
from sqlmesh.utils.date import (
TimeLike,
now_ds,
to_timestamp,
format_tz_datetime,
now_timestamp,
now,
)
from sqlmesh.utils.errors import (
CircuitBreakerError,
ConfigError,
Expand Down Expand Up @@ -1513,7 +1520,11 @@ def plan_builder(
# If no end date is specified, use the max interval end from prod
# to prevent unintended evaluation of the entire DAG.
default_start, default_end = self._get_plan_default_start_end(
snapshots, max_interval_end_per_model, backfill_models, modified_model_names
snapshots,
max_interval_end_per_model,
backfill_models,
modified_model_names,
execution_time or now(),
)

# Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model.
Expand Down Expand Up @@ -2818,6 +2829,7 @@ def _get_plan_default_start_end(
max_interval_end_per_model: t.Dict[str, int],
backfill_models: t.Optional[t.Set[str]],
modified_model_names: t.Set[str],
execution_time: t.Optional[TimeLike] = None,
) -> t.Tuple[t.Optional[int], t.Optional[int]]:
if not max_interval_end_per_model:
return None, None
Expand All @@ -2843,6 +2855,12 @@ def _get_plan_default_start_end(
)
),
)

if execution_time and to_timestamp(default_end) > to_timestamp(execution_time):
# the end date can't be in the future, which can happen if a specific `execution_time` is set and prod intervals
# are newer than it
default_end = to_timestamp(execution_time)

return default_start, default_end

def _get_max_interval_end_per_model(
Expand Down
57 changes: 53 additions & 4 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
to_datetime,
yesterday_ds,
to_timestamp,
time_like_to_str,
is_relative,
)
from sqlmesh.utils.errors import NoChangesPlanError, PlanError

Expand All @@ -55,6 +57,7 @@ class PlanBuilder:
start: The start time to backfill data.
end: The end time to backfill data.
execution_time: The date/time time reference to use for execution time. Defaults to now.
If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time
apply: The callback to apply the plan.
restate_models: A list of models for which the data should be restated for the time range
specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
Expand Down Expand Up @@ -137,7 +140,14 @@ def __init__(
self._include_unmodified = include_unmodified
self._restate_models = set(restate_models) if restate_models is not None else None
self._effective_from = effective_from

# note: this deliberately doesnt default to now() here.
# There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run
# so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None
# in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past
# ref: https://github.com/TobikoData/sqlmesh/pull/4702#discussion_r2140696156
self._execution_time = execution_time

self._backfill_models = backfill_models
self._end = end or default_end
self._apply = apply
Expand Down Expand Up @@ -172,6 +182,26 @@ def is_start_and_end_allowed(self) -> bool:
"""Indicates whether this plan allows to set the start and end dates."""
return self._is_dev or bool(self._restate_models)

@property
def start(self) -> t.Optional[TimeLike]:
if self._start and is_relative(self._start):
# only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
return to_datetime(self._start, relative_base=to_datetime(self.execution_time))
return self._start

@property
def end(self) -> t.Optional[TimeLike]:
if self._end and is_relative(self._end):
# only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
return to_datetime(self._end, relative_base=to_datetime(self.execution_time))
return self._end

@cached_property
def execution_time(self) -> TimeLike:
# this is cached to return a stable value from now() in the places where the execution time matters for resolving relative date strings
# during the plan building process
return self._execution_time or now()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fall back to now() in constructor? So that we have it return the same value throughout the plan building process?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that seems to work. I've updated it

Copy link
Copy Markdown
Collaborator Author

@erindru erindru Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted defaulting to now() in the constructor and left a comment for next time, based on the conversation below


def set_start(self, new_start: TimeLike) -> PlanBuilder:
self._start = new_start
self.override_start = True
Expand Down Expand Up @@ -256,7 +286,8 @@ def build(self) -> Plan:
)

restatements = self._build_restatements(
dag, earliest_interval_start(self._context_diff.snapshots.values())
dag,
earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time),
)
models_to_backfill = self._build_models_to_backfill(dag, restatements)

Expand All @@ -266,11 +297,17 @@ def build(self) -> Plan:
# model should be ignored.
interval_end_per_model = None

# this deliberately uses the passed in self._execution_time and not self.execution_time cached property
# the reason is because that there can be a delay between the Plan being built and the Plan being actually run,
# so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to
# the current timestamp of when the Plan is eventually run
plan_execution_time = self._execution_time

plan = Plan(
context_diff=self._context_diff,
plan_id=self._plan_id,
provided_start=self._start,
provided_end=self._end,
provided_start=self.start,
provided_end=self.end,
is_dev=self._is_dev,
skip_backfill=self._skip_backfill,
empty_backfill=self._empty_backfill,
Expand All @@ -289,7 +326,7 @@ def build(self) -> Plan:
selected_models_to_backfill=self._backfill_models,
models_to_backfill=models_to_backfill,
effective_from=self._effective_from,
execution_time=self._execution_time,
execution_time=plan_execution_time,
end_bounded=self._end_bounded,
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
user_provided_flags=self._user_provided_flags,
Expand Down Expand Up @@ -739,6 +776,18 @@ def _ensure_valid_date_range(self) -> None:
"The start and end dates can't be set for a production plan without restatements."
)

if (start := self.start) and (end := self.end):
if to_datetime(start) > to_datetime(end):
raise PlanError(
f"Plan end date: '{time_like_to_str(end)}' must be after the plan start date: '{time_like_to_str(start)}'"
)

if end := self.end:
if to_datetime(end) > to_datetime(self.execution_time):
raise PlanError(
f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')"
)

def _ensure_no_forward_only_revert(self) -> None:
"""Ensures that a previously superseded breaking / non-breaking snapshot is not being
used again to replace an existing forward-only snapshot with the same version.
Expand Down
18 changes: 13 additions & 5 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime
from enum import Enum
from functools import cached_property
from pydantic import Field

from sqlmesh.core.context_diff import ContextDiff
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
Expand Down Expand Up @@ -63,7 +64,7 @@ class Plan(PydanticModel, frozen=True):
models_to_backfill: t.Optional[t.Set[str]] = None
"""All models that should be backfilled as part of this plan."""
effective_from: t.Optional[TimeLike] = None
execution_time: t.Optional[TimeLike] = None
execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time")

user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None

Expand All @@ -80,7 +81,12 @@ def start(self) -> TimeLike:

@cached_property
def end(self) -> TimeLike:
return self.provided_end or now()
return self.provided_end or self.execution_time

@cached_property
def execution_time(self) -> TimeLike:
# note: property is cached so that it returns a consistent timestamp for now()
return self.execution_time_ or now()

@property
def previous_plan_id(self) -> t.Optional[str]:
Expand Down Expand Up @@ -271,7 +277,7 @@ def to_evaluatable(self) -> EvaluatablePlan:

@cached_property
def _earliest_interval_start(self) -> datetime:
return earliest_interval_start(self.snapshots.values())
return earliest_interval_start(self.snapshots.values(), self.execution_time)


class EvaluatablePlan(PydanticModel):
Expand Down Expand Up @@ -345,8 +351,10 @@ def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str:
return format_intervals(self.merged_intervals, unit)


def earliest_interval_start(snapshots: t.Collection[Snapshot]) -> datetime:
earliest_start = earliest_start_date(snapshots)
def earliest_interval_start(
snapshots: t.Collection[Snapshot], execution_time: t.Optional[TimeLike] = None
) -> datetime:
earliest_start = earliest_start_date(snapshots, relative_to=execution_time)
earliest_interval_starts = [s.intervals[0][0] for s in snapshots if s.intervals]
return (
min(earliest_start, to_datetime(min(earliest_interval_starts)))
Expand Down
7 changes: 6 additions & 1 deletion sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,7 +1995,12 @@ def earliest_start_date(
start_date(snapshot, snapshots, cache=cache, relative_to=relative_to)
for snapshot in snapshots.values()
)
return yesterday()

relative_base = None
if relative_to is not None:
relative_base = to_datetime(relative_to)

return yesterday(relative_base=relative_base)


def start_date(
Expand Down
34 changes: 22 additions & 12 deletions sqlmesh/utils/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,34 +87,34 @@ def now_ds() -> str:
return to_ds(now())


def yesterday() -> datetime:
def yesterday(relative_base: t.Optional[datetime] = None) -> datetime:
"""
Yesterday utc datetime.

Returns:
A datetime object with tz utc representing yesterday's date
"""
return to_datetime("yesterday")
return to_datetime("yesterday", relative_base=relative_base)


def yesterday_ds() -> str:
def yesterday_ds(relative_base: t.Optional[datetime] = None) -> str:
"""
Yesterday utc ds.

Returns:
Yesterday's ds string.
"""
return to_ds("yesterday")
return to_ds("yesterday", relative_base=relative_base)


def yesterday_timestamp() -> int:
def yesterday_timestamp(relative_base: t.Optional[datetime] = None) -> int:
"""
Yesterday utc timestamp.

Returns:
UTC epoch millis timestamp of yesterday
"""
return to_timestamp(yesterday())
return to_timestamp(yesterday(relative_base=relative_base))


def to_timestamp(
Expand Down Expand Up @@ -265,19 +265,19 @@ def date_dict(
return kwargs


def to_ds(obj: TimeLike) -> str:
def to_ds(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
"""Converts a TimeLike object into YYYY-MM-DD formatted string."""
return to_ts(obj)[0:10]
return to_ts(obj, relative_base=relative_base)[0:10]


def to_ts(obj: TimeLike) -> str:
def to_ts(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
"""Converts a TimeLike object into YYYY-MM-DD HH:MM:SS formatted string."""
return to_datetime(obj).replace(tzinfo=None).isoformat(sep=" ")
return to_datetime(obj, relative_base=relative_base).replace(tzinfo=None).isoformat(sep=" ")


def to_tstz(obj: TimeLike) -> str:
def to_tstz(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str:
"""Converts a TimeLike object into YYYY-MM-DD HH:MM:SS+00:00 formatted string."""
return to_datetime(obj).isoformat(sep=" ")
return to_datetime(obj, relative_base=relative_base).isoformat(sep=" ")


def is_date(obj: TimeLike) -> bool:
Expand Down Expand Up @@ -373,6 +373,16 @@ def is_categorical_relative_expression(expression: str) -> bool:
return not any(k in TIME_UNITS for k in grain_kwargs)


def is_relative(value: TimeLike) -> bool:
"""
Tests a TimeLike object to see if it is a relative expression, eg '1 week ago' as opposed to an absolute timestamp
"""
if isinstance(value, str):
return is_categorical_relative_expression(value)

return False


def to_time_column(
time_column: t.Union[TimeLike, exp.Null],
time_column_type: exp.DataType,
Expand Down
Loading